refactor(compareAndDelete)

release-0.4
Xiang Li 2013-12-20 05:10:22 +08:00
parent e2fa89d554
commit 9cf1fcc987
8 changed files with 74 additions and 168 deletions

View File

@ -44,6 +44,6 @@ func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error {
}
}
c := s.Store().CommandFactory().CreateCompareAndDeleteCommand(key, recursive, prevValue, prevIndex)
c := s.Store().CommandFactory().CreateCompareAndDeleteCommand(key, prevValue, prevIndex)
return s.Dispatch(c, w, req)
}

View File

@ -84,95 +84,42 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) {
})
}
// Ensures that a directory is deleted.
// Ensures that a key is deleted if the previous index matches
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?recursive=true
//
func TestV2DeleteDirectory(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo?recursive=true"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "delete", "")
assert.Equal(t, body["dir"], true, "")
assert.Equal(t, body["modifiedIndex"], 2, "")
})
}
// Ensures that a directory is deleted if the previous index matches
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?recursive=true&prevIndex=1
//
func TestV2DeleteDirectoryCADOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo?recursive=true&prevIndex=1"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
assert.Equal(t, body["dir"], true, "")
assert.Equal(t, body["modifiedIndex"], 2, "")
})
}
// Ensures that a directory is not deleted if the previous index does not match
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?recursive=true&prevIndex=100
//
func TestV2DeleteDirectoryCADOnIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo?recursive=true&prevIndex=100"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101)
})
}
// Ensures that a key is deleted only if the previous index matches.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=1
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=1
//
func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevIndex=1"), v)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=2"), url.Values{})
assert.Nil(t, err, "")
fmt.Println(resp)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
assert.Equal(t, body["prevValue"], "XXX", "")
assert.Equal(t, body["modifiedIndex"], 2, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
})
}
// Ensures that a key is not deleted if the previous index does not matche.
// Ensures that a key is not deleted if the previous index does not match
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=2
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=100
//
func TestV2DeleteKeyCADOnIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevIndex=100"), v)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=100"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101)
})
@ -187,9 +134,9 @@ func TestV2DeleteKeyCADWithInvalidIndex(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevIndex=bad_index"), v)
resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevIndex=bad_index"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 203)
})
@ -204,17 +151,18 @@ func TestV2DeleteKeyCADOnValueSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=XXX"), v)
resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=XXX"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
assert.Equal(t, body["prevValue"], "XXX", "")
assert.Equal(t, body["modifiedIndex"], 2, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["modifiedIndex"], 3, "")
})
}
// Ensures that a key is not deleted if the previous value does not matche.
// Ensures that a key is not deleted if the previous value does not match.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevValue=YYY
@ -223,9 +171,9 @@ func TestV2DeleteKeyCADOnValueFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=YYY"), v)
resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevValue=YYY"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101)
})
@ -240,9 +188,9 @@ func TestV2DeleteKeyCADWithInvalidValue(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, _ = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?prevValue="), v)
resp, _ = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?prevValue="), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 201)
})

View File

@ -22,7 +22,7 @@ type CommandFactory interface {
CreateDeleteCommand(key string, dir, recursive bool) raft.Command
CreateCompareAndSwapCommand(key string, value string, prevValue string,
prevIndex uint64, expireTime time.Time) raft.Command
CreateCompareAndDeleteCommand(key string, recursive bool, prevValue string, prevIndex uint64) raft.Command
CreateCompareAndDeleteCommand(key string, prevValue string, prevIndex uint64) raft.Command
CreateSyncCommand(now time.Time) raft.Command
}

View File

@ -306,6 +306,13 @@ func (n *node) UpdateTTL(expireTime time.Time) {
}
}
func (n *node) Compare(prevValue string, prevIndex uint64) bool {
compareValue := (prevValue == "" || n.Value == prevValue)
compareIndex := (prevIndex == 0 || n.ModifiedIndex == prevIndex)
return compareValue && compareIndex
}
// Clone function clone the node recursively and return the new node.
// If the node is a directory, it will clone all the content under this directory.
// If the node is a key-value pair, it will clone the pair.

View File

@ -51,7 +51,7 @@ type Store interface {
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time) (*Event, error)
Delete(nodePath string, recursive, dir bool) (*Event, error)
CompareAndDelete(nodePath string, recursive bool, prevValue string, prevIndex uint64) (*Event, error)
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
Save() ([]byte, error)
@ -208,14 +208,14 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
return nil, err
}
if n.IsDir() { // can only test and set file
if n.IsDir() { // can only compare and swap file
s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
}
// If both of the prevValue and prevIndex are given, we will test both of them.
// Command will be executed, only if both of the tests are successful.
if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
if n.Compare(prevValue, prevIndex) {
// update etcd index
s.CurrentIndex++
@ -258,8 +258,6 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
dir = true
}
nextIndex := s.CurrentIndex + 1
n, err := s.internalGet(nodePath)
if err != nil { // if the node does not exist, return error
@ -267,6 +265,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
return nil, err
}
nextIndex := s.CurrentIndex + 1
e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
eNode := e.Node
@ -297,7 +296,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
return e, nil
}
func (s *store) CompareAndDelete(nodePath string, recursive bool, prevValue string, prevIndex uint64) (*Event, error) {
func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock()
@ -310,31 +309,26 @@ func (s *store) CompareAndDelete(nodePath string, recursive bool, prevValue stri
return nil, err
}
isDir := n.IsDir()
if n.IsDir() { // can only compare and delete file
s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
}
// If both of the prevValue and prevIndex are given, we will test both of them.
// Command will be executed, only if both of the tests are successful.
if (isDir || prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
if n.Compare(prevValue, prevIndex) {
// update etcd index
s.CurrentIndex++
e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex+1, n.CreatedIndex)
if isDir {
e.Node.Dir = true
}
e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
callback := func(path string) { // notify function
// notify the watchers with deleted set true
s.WatcherHub.notifyWatchers(e, path, true)
}
err = n.Remove(true, recursive, callback)
if err != nil {
s.Stats.Inc(CompareAndDeleteFail)
return nil, err
}
// update etcd index
s.CurrentIndex++
// delete a key-value pair, no error should happen
n.Remove(false, false, callback)
s.WatcherHub.notify(e)
s.Stats.Inc(CompareAndDeleteSuccess)

View File

@ -341,95 +341,54 @@ func TestRootRdOnly(t *testing.T) {
func TestStoreCompareAndDeletePrevValue(t *testing.T) {
s := newStore()
s.Create("/foo", "bar", false, Permanent)
e, err := s.CompareAndDelete("/foo", false, "bar", 0)
s.Create("/foo", false, "bar", false, Permanent)
e, err := s.CompareAndDelete("/foo", "bar", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
assert.Equal(t, e.Dir, false, "")
assert.Equal(t, e.Node.Key, "/foo", "")
}
func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) {
s := newStore()
s.Create("/foo", "bar", false, Permanent)
e, _err := s.CompareAndDelete("/foo", false, "baz", 0)
s.Create("/foo", false, "bar", false, Permanent)
e, _err := s.CompareAndDelete("/foo", "baz", 0)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "")
assert.Equal(t, err.Message, "Compare failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "bar", "")
assert.Equal(t, e.Node.Value, "bar", "")
}
func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
s := newStore()
s.Create("/foo", "bar", false, Permanent)
e, err := s.CompareAndDelete("/foo", false, "", 1)
s.Create("/foo", false, "bar", false, Permanent)
e, err := s.CompareAndDelete("/foo", "", 1)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
}
func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore()
s.Create("/foo", "bar", false, Permanent)
e, _err := s.CompareAndDelete("/foo", false, "", 100)
s.Create("/foo", false, "bar", false, Permanent)
e, _err := s.CompareAndDelete("/foo", "", 100)
assert.NotNil(t, _err, "")
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "")
assert.Equal(t, err.Message, "Compare failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "bar", "")
assert.Equal(t, e.Node.Value, "bar", "")
}
// Ensure that the store can delete a directory if recursive is specified.
func TestStoreCompareAndDeleteDiretory(t *testing.T) {
// Ensure that the store cannot delete a directory.
func TestStoreCompareAndDeleteDiretoryFail(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, err := s.CompareAndDelete("/foo", true, "", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
assert.Equal(t, e.Dir, true, "")
}
// Ensure that the store can delete a directory if recursive is specified.
func TestStoreCompareAndDeleteDiretoryIgnoringPrevValue(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, err := s.CompareAndDelete("/foo", true, "baz", 0)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
}
// Ensure that the store can delete a directory with a prev index.
func TestStoreCompareAndDeleteDirectoryPrevIndex(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, err := s.CompareAndDelete("/foo", true, "", 1)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndDelete", "")
}
// Ensure that the store won't delete a directory if prevIndex does not match
func TestStoreCompareAndDeleteDirectoryPrevIndexFailsIfNotMatch(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, _err := s.CompareAndDelete("/foo", true, "", 100)
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
assert.Equal(t, err.Message, "Test Failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "", "")
}
// Ensure that the store cannot delete a directory if recursive is not specified.
func TestStoreCompareAndDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
s := newStore()
s.Create("/foo", "", false, Permanent)
e, _err := s.CompareAndDelete("/foo", false, "", 0)
s.Create("/foo", true, "", false, Permanent)
_, _err := s.CompareAndDelete("/foo", "", 0)
assert.NotNil(t, _err, "")
err := _err.(*etcdErr.Error)
assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
assert.Equal(t, err.Message, "Not A File", "")
assert.Nil(t, e, "")
}
// Ensure that the store can conditionally update a key if it has a previous value.

View File

@ -76,10 +76,9 @@ func (f *CommandFactory) CreateCompareAndSwapCommand(key string, value string, p
}
// CreateCompareAndDeleteCommand creates a version 2 command to conditionally delete a key from the store.
func (f *CommandFactory) CreateCompareAndDeleteCommand(key string, recursive bool, prevValue string, prevIndex uint64) raft.Command {
func (f *CommandFactory) CreateCompareAndDeleteCommand(key string, prevValue string, prevIndex uint64) raft.Command {
return &CompareAndDeleteCommand{
Key: key,
Recursive: recursive,
PrevValue: prevValue,
PrevIndex: prevIndex,
}

View File

@ -15,7 +15,6 @@ type CompareAndDeleteCommand struct {
Key string `json:"key"`
PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"`
Recursive bool `json:"recursive"`
}
// The name of the compareAndDelete command in the log
@ -27,7 +26,7 @@ func (c *CompareAndDeleteCommand) CommandName() string {
func (c *CompareAndDeleteCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(store.Store)
e, err := s.CompareAndDelete(c.Key, c.Recursive, c.PrevValue, c.PrevIndex)
e, err := s.CompareAndDelete(c.Key, c.PrevValue, c.PrevIndex)
if err != nil {
log.Debug(err)