diff --git a/server/peer_server.go b/server/peer_server.go index 97b88a93d..dbcba9814 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -17,6 +17,7 @@ import ( "github.com/coreos/etcd/log" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" + "github.com/gorilla/mux" ) type PeerServer struct { @@ -236,25 +237,27 @@ func (s *PeerServer) startAsFollower(cluster []string) { func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error { log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url) - raftMux := http.NewServeMux() + router := mux.NewRouter() s.httpServer = &http.Server{ - Handler: raftMux, + Handler: router, TLSConfig: &tlsConf, Addr: s.listenHost, } // internal commands - raftMux.HandleFunc("/name", s.NameHttpHandler) - raftMux.HandleFunc("/version", s.VersionHttpHandler) - raftMux.HandleFunc("/join", s.JoinHttpHandler) - raftMux.HandleFunc("/remove/", s.RemoveHttpHandler) - raftMux.HandleFunc("/vote", s.VoteHttpHandler) - raftMux.HandleFunc("/log", s.GetLogHttpHandler) - raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler) - raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler) - raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) - raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) + router.HandleFunc("/name", s.NameHttpHandler) + router.HandleFunc("/version", s.VersionHttpHandler) + router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler) + router.HandleFunc("/upgrade", s.UpgradeHttpHandler) + router.HandleFunc("/join", s.JoinHttpHandler) + router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler) + router.HandleFunc("/vote", s.VoteHttpHandler) + router.HandleFunc("/log", s.GetLogHttpHandler) + router.HandleFunc("/log/append", s.AppendEntriesHttpHandler) + router.HandleFunc("/snapshot", s.SnapshotHttpHandler) + router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) + router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) if scheme == "http" { return s.listenAndServe() @@ -283,6 +286,29 @@ func getVersion(t *transporter, versionURL url.URL) (int, error) { return version, nil } +// Upgradable checks whether all peers in a cluster support an upgrade to the next store version. +func (s *PeerServer) Upgradable() error { + nextVersion := s.store.Version() + 1 + for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.name) { + u, err := url.Parse(peerURL) + if err != nil { + return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err) + } + + t, _ := s.raftServer.Transporter().(*transporter) + checkURL := (&url.URL{Host: u.Host, Scheme: s.tlsConf.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String() + resp, _, err := t.Get(checkURL) + if err != nil { + return fmt.Errorf("PeerServer: Cannot check version compatibility: %s", u.Host) + } + if resp.StatusCode != 200 { + return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host) + } + } + + return nil +} + func (s *PeerServer) joinCluster(cluster []string) bool { for _, machine := range cluster { if len(machine) == 0 { diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index b4b0a8b17..be665dbf5 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -7,7 +7,9 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" "github.com/coreos/go-raft" + "github.com/gorilla/mux" ) // Get all the current logs @@ -134,9 +136,9 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request return } - nodeName := req.URL.Path[len("/remove/"):] + vars := mux.Vars(req) command := &RemoveCommand{ - Name: nodeName, + Name: vars["name"], } log.Debugf("[recv] Remove Request [%s]", command.Name) @@ -157,3 +159,35 @@ func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Reques w.WriteHeader(http.StatusOK) w.Write([]byte(strconv.Itoa(ps.store.Version()))) } + +// Checks whether a given version is supported. +func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) { + log.Debugf("[recv] Get %s%s ", ps.url, req.URL.Path) + vars := mux.Vars(req) + version, _ := strconv.Atoi(vars["version"]) + if version >= store.MinVersion() && version <= store.MaxVersion() { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusForbidden) + } +} + +// Upgrades the current store version to the next version. +func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) { + log.Debugf("[recv] Get %s/version", ps.url) + + // Check if upgrade is possible for all nodes. + if err := ps.Upgradable(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Create an upgrade command from the current version. + c := ps.store.CommandFactory().CreateUpgradeCommand() + if err := ps.server.Dispatch(c, w, req); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/store/command_factory.go b/store/command_factory.go index 07b169d49..9b52f42f9 100644 --- a/store/command_factory.go +++ b/store/command_factory.go @@ -15,6 +15,7 @@ var minVersion, maxVersion int // depending on the current version of the store. type CommandFactory interface { Version() int + CreateUpgradeCommand() raft.Command CreateSetCommand(key string, value string, expireTime time.Time) raft.Command CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command diff --git a/store/v2/command_factory.go b/store/v2/command_factory.go index f69bd7f1b..4f0e7260c 100644 --- a/store/v2/command_factory.go +++ b/store/v2/command_factory.go @@ -20,6 +20,11 @@ func (f *CommandFactory) Version() int { return 2 } +// CreateUpgradeCommand is a no-op since version 2 is the first version to support store versioning. +func (f *CommandFactory) CreateUpgradeCommand() raft.Command { + return &raft.NOPCommand{} +} + // CreateSetCommand creates a version 2 command to set a key to a given value in the store. func (f *CommandFactory) CreateSetCommand(key string, value string, expireTime time.Time) raft.Command { return &SetCommand{ diff --git a/store/v2/compare_and_swap_command.go b/store/v2/compare_and_swap_command.go index be1bcf826..1de79fb1f 100644 --- a/store/v2/compare_and_swap_command.go +++ b/store/v2/compare_and_swap_command.go @@ -23,7 +23,7 @@ type CompareAndSwapCommand struct { // The name of the testAndSet command in the log func (c *CompareAndSwapCommand) CommandName() string { - return "etcd:v2:compareAndSwap" + return "etcd:compareAndSwap" } // Set the key-value pair if the current value of the key equals to the given prevValue diff --git a/store/v2/create_command.go b/store/v2/create_command.go index 8772e11f5..e187d99f7 100644 --- a/store/v2/create_command.go +++ b/store/v2/create_command.go @@ -22,7 +22,7 @@ type CreateCommand struct { // The name of the create command in the log func (c *CreateCommand) CommandName() string { - return "etcd:v2:create" + return "etcd:create" } // Create node diff --git a/store/v2/delete_command.go b/store/v2/delete_command.go index 861f91971..6bd48368f 100644 --- a/store/v2/delete_command.go +++ b/store/v2/delete_command.go @@ -18,7 +18,7 @@ type DeleteCommand struct { // The name of the delete command in the log func (c *DeleteCommand) CommandName() string { - return "etcd:v2:delete" + return "etcd:delete" } // Delete the key diff --git a/store/v2/set_command.go b/store/v2/set_command.go index 3d9a92f51..4f6ecf59f 100644 --- a/store/v2/set_command.go +++ b/store/v2/set_command.go @@ -21,7 +21,7 @@ type SetCommand struct { // The name of the create command in the log func (c *SetCommand) CommandName() string { - return "etcd:v2:set" + return "etcd:set" } // Create node diff --git a/store/v2/update_command.go b/store/v2/update_command.go index e1136ddac..d080ecced 100644 --- a/store/v2/update_command.go +++ b/store/v2/update_command.go @@ -20,7 +20,7 @@ type UpdateCommand struct { // The name of the update command in the log func (c *UpdateCommand) CommandName() string { - return "etcd:v2:update" + return "etcd:update" } // Create node diff --git a/tests/functional/version_check_test.go b/tests/functional/version_check_test.go new file mode 100644 index 000000000..98a155390 --- /dev/null +++ b/tests/functional/version_check_test.go @@ -0,0 +1,46 @@ +package test + +import ( + "net/http" + "os" + "testing" + "time" +) + +// Ensure that a node can reply to a version check appropriately. +func TestVersionCheck(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/version_check"} + + process, err := os.StartProcess(EtcdBinPath, args, procAttr) + if err != nil { + t.Fatal("start process failed:" + err.Error()) + return + } + defer process.Kill() + + time.Sleep(time.Second) + + // Check a version too small. + resp, _ := http.Get("http://localhost:7001/version/1/check") + resp.Body.Close() + if resp.StatusCode != http.StatusForbidden { + t.Fatal("Invalid version check: ", resp.StatusCode) + } + + // Check a version too large. + resp, _ = http.Get("http://localhost:7001/version/3/check") + resp.Body.Close() + if resp.StatusCode != http.StatusForbidden { + t.Fatal("Invalid version check: ", resp.StatusCode) + } + + // Check a version that's just right. + resp, _ = http.Get("http://localhost:7001/version/2/check") + resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatal("Invalid version check: ", resp.StatusCode) + } +} +