Internal versioning.

release-0.4
Ben Johnson 2013-10-27 12:47:00 -06:00
parent a70aa3e0da
commit aa9ae32998
21 changed files with 250 additions and 109 deletions

View File

@ -14,15 +14,17 @@ func init() {
// The JoinCommand adds a node to the cluster.
type JoinCommand struct {
RaftVersion string `json:"raftVersion"`
MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"`
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
}
func NewJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand {
func NewJoinCommand(minVersion int, maxVersion int, name, raftUrl, etcdUrl string) *JoinCommand {
return &JoinCommand{
RaftVersion: version,
MinVersion: minVersion,
MaxVersion: maxVersion,
Name: name,
RaftURL: raftUrl,
EtcdURL: etcdUrl,
@ -56,7 +58,7 @@ func (c *JoinCommand) Apply(server raft.Server) (interface{}, error) {
}
// Add to shared machine registry.
ps.registry.Register(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL, server.CommitIndex(), server.Term())
ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL, server.CommitIndex(), server.Term())
// Add peer in raft
err := server.AddPeer(c.Name, "")

View File

@ -10,6 +10,7 @@ import (
"net"
"net/http"
"net/url"
"strconv"
"time"
etcdErr "github.com/coreos/etcd/error"
@ -209,7 +210,7 @@ func (s *PeerServer) SetServer(server *Server) {
func (s *PeerServer) startAsLeader() {
// leader need to join self as a peer
for {
_, err := s.raftServer.Do(NewJoinCommand(PeerVersion, s.raftServer.Name(), s.url, s.server.URL()))
_, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.url, s.server.URL()))
if err == nil {
break
}
@ -245,7 +246,7 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error {
// internal commands
raftMux.HandleFunc("/name", s.NameHttpHandler)
raftMux.HandleFunc("/version", s.RaftVersionHttpHandler)
raftMux.HandleFunc("/version", s.VersionHttpHandler)
raftMux.HandleFunc("/join", s.JoinHttpHandler)
raftMux.HandleFunc("/remove/", s.RemoveHttpHandler)
raftMux.HandleFunc("/vote", s.VoteHttpHandler)
@ -263,21 +264,23 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error {
}
// getVersion fetches the raft version of a peer. This works for now but we
// will need to do something more sophisticated later when we allow mixed
// version clusters.
func getVersion(t *transporter, versionURL url.URL) (string, error) {
// getVersion fetches the peer version of a cluster.
func getVersion(t *transporter, versionURL url.URL) (int, error) {
resp, req, err := t.Get(versionURL.String())
if err != nil {
return "", err
return 0, err
}
defer resp.Body.Close()
t.CancelWhenTimeout(req)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}
return string(body), nil
// Parse version number.
version, _ := strconv.Atoi(string(body))
return version, nil
}
func (s *PeerServer) joinCluster(cluster []string) bool {
@ -315,14 +318,11 @@ func (s *PeerServer) joinByMachine(server raft.Server, machine string, scheme st
if err != nil {
return fmt.Errorf("Error during join version check: %v", err)
}
// TODO: versioning of the internal protocol. See:
// Documentation/internatl-protocol-versioning.md
if version != PeerVersion {
return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
if version < store.MinVersion() || version > store.MaxVersion() {
return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion())
}
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL()))
joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
@ -347,7 +347,7 @@ func (s *PeerServer) joinByMachine(server raft.Server, machine string, scheme st
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL()))
resp, req, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest {

View File

@ -3,6 +3,7 @@ package server
import (
"encoding/json"
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
@ -151,8 +152,8 @@ func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request)
}
// Response to the name request
func (ps *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/version/ ", ps.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(PeerVersion))
w.Write([]byte(strconv.Itoa(ps.store.Version())))
}

View File

@ -38,13 +38,13 @@ func NewRegistry(s store.Store) *Registry {
}
// Adds a node to the registry.
func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) error {
func (r *Registry) Register(name string, peerURL string, url string, commitIndex uint64, term uint64) error {
r.Lock()
defer r.Unlock()
// Write data to store.
key := path.Join(RegistryKey, name)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion)
value := fmt.Sprintf("raft=%s&etcd=%s", peerURL, url)
_, err := r.store.Create(key, value, false, store.Permanent, commitIndex, term)
log.Debugf("Register: %s", name)
return err
@ -175,6 +175,5 @@ func (r *Registry) load(name string) {
r.nodes[name] = &node{
url: m["etcd"][0],
peerURL: m["raft"][0],
peerVersion: m["raftVersion"][0],
}
}

View File

@ -15,6 +15,7 @@ import (
"github.com/coreos/etcd/server/v1"
"github.com/coreos/etcd/server/v2"
"github.com/coreos/etcd/store"
_ "github.com/coreos/etcd/store/v2"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
)
@ -366,11 +367,7 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
for i := 0; i < count; i++ {
go func() {
for j := 0; j < 10; j++ {
c := &store.SetCommand{
Key: "foo",
Value: "bar",
ExpireTime: time.Unix(0, 0),
}
c := s.Store().CommandFactory().CreateSetCommand("foo", "bar", time.Unix(0, 0))
s.peerServer.RaftServer().Do(c)
}
c <- true

View File

@ -1,7 +1,6 @@
package v1
import (
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
"net/http"
)
@ -10,6 +9,6 @@ import (
func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
c := &store.DeleteCommand{Key: key}
c := s.Store().CommandFactory().CreateDeleteCommand(key, false)
return s.Dispatch(c, w, req)
}

View File

@ -31,27 +31,16 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
// If the "prevValue" is specified then test-and-set. Otherwise create a new key.
var c raft.Command
if prevValueArr, ok := req.Form["prevValue"]; ok {
if len(prevValueArr[0]) > 0 { // test against previous value
c = &store.CompareAndSwapCommand{
Key: key,
Value: value,
PrevValue: prevValueArr[0],
ExpireTime: expireTime,
}
if len(prevValueArr[0]) > 0 {
// test against previous value
c = s.Store().CommandFactory().CreateCompareAndSwapCommand(key, value, prevValueArr[0], 0, expireTime)
} else {
c = &store.CreateCommand{ // test against existence
Key: key,
Value: value,
ExpireTime: expireTime,
}
// test against existence
c = s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, false)
}
} else {
c = &store.SetCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
c = s.Store().CommandFactory().CreateSetCommand(key, value, expireTime)
}
return s.Dispatch(c, w, req)

View File

@ -3,18 +3,14 @@ package v2
import (
"net/http"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
)
func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
recursive := (req.FormValue("recursive") == "true")
c := &store.DeleteCommand{
Key: key,
Recursive: (req.FormValue("recursive") == "true"),
}
c := s.Store().CommandFactory().CreateDeleteCommand(key, recursive)
return s.Dispatch(c, w, req)
}

View File

@ -18,12 +18,6 @@ func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
}
c := &store.CreateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
Unique: true,
}
c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, true)
return s.Dispatch(c, w, req)
}

View File

@ -71,31 +71,17 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
}
}
c = &store.CompareAndSwapCommand{
Key: key,
Value: value,
PrevValue: prevValue,
PrevIndex: prevIndex,
}
c = s.Store().CommandFactory().CreateCompareAndSwapCommand(key, value, prevValue, prevIndex, store.Permanent)
return s.Dispatch(c, w, req)
}
func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
c := &store.SetCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
c := s.Store().CommandFactory().CreateSetCommand(key, value, expireTime)
return s.Dispatch(c, w, req)
}
func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
c := &store.CreateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, false)
return s.Dispatch(c, w, req)
}
@ -105,10 +91,6 @@ func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, valu
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
}
c := &store.UpdateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
c := s.Store().CommandFactory().CreateUpdateCommand(key, value, expireTime)
return s.Dispatch(c, w, req)
}

View File

@ -0,0 +1,29 @@
package v2
import (
"fmt"
"net/url"
"testing"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/tests"
"github.com/stretchr/testify/assert"
)
// Ensures that a key is deleted.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar
//
func TestV2DeleteKey(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":4,"term":0}`, "")
})
}

View File

@ -1,8 +1,3 @@
package server
const Version = "v2"
// TODO: The release version (generated from the git tag) will be the raft
// protocol version for now. When things settle down we will fix it like the
// client API above.
const PeerVersion = ReleaseVersion

57
store/command_factory.go Normal file
View File

@ -0,0 +1,57 @@
package store
import (
"fmt"
"time"
"github.com/coreos/go-raft"
)
// A lookup of factories by version.
var factories = make(map[int]CommandFactory)
var minVersion, maxVersion int
// The CommandFactory provides a way to create different types of commands
// depending on the current version of the store.
type CommandFactory interface {
Version() int
CreateSetCommand(key string, value string, expireTime time.Time) raft.Command
CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command
CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command
CreateDeleteCommand(key string, recursive bool) raft.Command
CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command
}
// RegisterCommandFactory adds a command factory to the global registry.
func RegisterCommandFactory(factory CommandFactory) {
version := factory.Version()
if GetCommandFactory(version) != nil {
panic(fmt.Sprintf("Command factory already registered for version: %d", factory.Version()))
}
factories[version] = factory
// Update compatibility versions.
if minVersion == 0 || version > minVersion {
minVersion = version
}
if maxVersion == 0 || version > maxVersion {
maxVersion = version
}
}
// GetCommandFactory retrieves a command factory for a given command version.
func GetCommandFactory(version int) CommandFactory {
return factories[version]
}
// MinVersion returns the minimum compatible store version.
func MinVersion() int {
return minVersion
}
// MaxVersion returns the maximum compatible store version.
func MaxVersion() int {
return maxVersion
}

View File

@ -13,7 +13,12 @@ import (
etcdErr "github.com/coreos/etcd/error"
)
// The default version to set when the store is first initialized.
const defaultVersion = 2
type Store interface {
Version() int
CommandFactory() CommandFactory
Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error)
Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error)
@ -34,6 +39,7 @@ type store struct {
Index uint64
Term uint64
Stats *Stats
CurrentVersion int
worldLock sync.RWMutex // stop the world lock
}
@ -43,13 +49,23 @@ func New() Store {
func newStore() *store {
s := new(store)
s.CurrentVersion = defaultVersion
s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent)
s.Stats = newStats()
s.WatcherHub = newWatchHub(1000)
return s
}
// Version retrieves current version of the store.
func (s *store) Version() int {
return s.CurrentVersion
}
// CommandFactory retrieves the command factory for the current version of the store.
func (s *store) CommandFactory() CommandFactory {
return GetCommandFactory(s.Version())
}
// Get function returns a get event.
// If recursive is true, it will return all the content under the node path.
// If sorted is true, it will sort the content by keys.
@ -449,6 +465,7 @@ func (s *store) Save() ([]byte, error) {
clonedStore.Root = s.Root.Clone()
clonedStore.WatcherHub = s.WatcherHub.clone()
clonedStore.Stats = s.Stats.clone()
clonedStore.CurrentVersion = s.CurrentVersion
s.worldLock.Unlock()
@ -482,3 +499,4 @@ func (s *store) JsonStats() []byte {
s.Stats.Watchers = uint64(s.WatcherHub.count)
return s.Stats.toJson()
}

View File

@ -0,0 +1,68 @@
package v2
import (
"time"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
func init() {
store.RegisterCommandFactory(&CommandFactory{})
}
// CommandFactory provides a pluggable way to create version 2 commands.
type CommandFactory struct {
}
// Version returns the version of this factory.
func (f *CommandFactory) Version() int {
return 2
}
// CreateSetCommand creates a version 2 command to set a key to a given value in the store.
func (f *CommandFactory) CreateSetCommand(key string, value string, expireTime time.Time) raft.Command {
return &SetCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
}
// CreateCreateCommand creates a version 2 command to create a new key in the store.
func (f *CommandFactory) CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command {
return &CreateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
Unique: unique,
}
}
// CreateUpdateCommand creates a version 2 command to update a key to a given value in the store.
func (f *CommandFactory) CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command {
return &UpdateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
}
// CreateDeleteCommand creates a version 2 command to delete a key from the store.
func (f *CommandFactory) CreateDeleteCommand(key string, recursive bool) raft.Command {
return &DeleteCommand{
Key: key,
Recursive: recursive,
}
}
// CreateCompareAndSwapCommand creates a version 2 command to conditionally set a key in the store.
func (f *CommandFactory) CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command {
return &CompareAndSwapCommand{
Key: key,
Value: value,
PrevValue: prevValue,
PrevIndex: prevIndex,
ExpireTime: expireTime,
}
}

View File

@ -1,9 +1,10 @@
package store
package v2
import (
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
@ -22,12 +23,12 @@ type CompareAndSwapCommand struct {
// The name of the testAndSet command in the log
func (c *CompareAndSwapCommand) CommandName() string {
return "etcd:compareAndSwap"
return "etcd:v2:compareAndSwap"
}
// Set the key-value pair if the current value of the key equals to the given prevValue
func (c *CompareAndSwapCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(Store)
s, _ := server.StateMachine().(store.Store)
e, err := s.CompareAndSwap(c.Key, c.PrevValue, c.PrevIndex,
c.Value, c.ExpireTime, server.CommitIndex(), server.Term())

View File

@ -1,9 +1,11 @@
package store
package v2
import (
"github.com/coreos/etcd/log"
"github.com/coreos/go-raft"
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
func init() {
@ -20,12 +22,12 @@ type CreateCommand struct {
// The name of the create command in the log
func (c *CreateCommand) CommandName() string {
return "etcd:create"
return "etcd:v2:create"
}
// Create node
func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(Store)
s, _ := server.StateMachine().(store.Store)
e, err := s.Create(c.Key, c.Value, c.Unique, c.ExpireTime, server.CommitIndex(), server.Term())

View File

@ -1,6 +1,7 @@
package store
package v2
import (
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/log"
"github.com/coreos/go-raft"
)
@ -17,12 +18,12 @@ type DeleteCommand struct {
// The name of the delete command in the log
func (c *DeleteCommand) CommandName() string {
return "etcd:delete"
return "etcd:v2:delete"
}
// Delete the key
func (c *DeleteCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(Store)
s, _ := server.StateMachine().(store.Store)
e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())

View File

@ -1,9 +1,11 @@
package store
package v2
import (
"github.com/coreos/etcd/log"
"github.com/coreos/go-raft"
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
func init() {
@ -19,12 +21,12 @@ type SetCommand struct {
// The name of the create command in the log
func (c *SetCommand) CommandName() string {
return "etcd:set"
return "etcd:v2:set"
}
// Create node
func (c *SetCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(Store)
s, _ := server.StateMachine().(store.Store)
// create a new node or replace the old node.
e, err := s.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())

View File

@ -1,7 +1,8 @@
package store
package v2
import (
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"time"
)
@ -19,12 +20,12 @@ type UpdateCommand struct {
// The name of the update command in the log
func (c *UpdateCommand) CommandName() string {
return "etcd:update"
return "etcd:v2:update"
}
// Create node
func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(Store)
s, _ := server.StateMachine().(store.Store)
e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())

View File

@ -55,6 +55,14 @@ func PutForm(url string, data url.Values) (*http.Response, error) {
return Put(url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
func Delete(url string, bodyType string, body io.Reader) (*http.Response, error) {
return send("DELETE", url, bodyType, body)
}
func DeleteForm(url string, data url.Values) (*http.Response, error) {
return Delete(url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
func send(method string, url string, bodyType string, body io.Reader) (*http.Response, error) {
c := NewHTTPClient()