finish todo
parent
9412c86b97
commit
512dede9ce
|
@ -16,10 +16,11 @@ const (
|
|||
EcodeNodeExist = 105
|
||||
EcodeKeyIsPreserved = 106
|
||||
|
||||
EcodeValueRequired = 200
|
||||
EcodePrevValueRequired = 201
|
||||
EcodeTTLNaN = 202
|
||||
EcodeIndexNaN = 203
|
||||
EcodeValueRequired = 200
|
||||
EcodePrevValueRequired = 201
|
||||
EcodeTTLNaN = 202
|
||||
EcodeIndexNaN = 203
|
||||
EcodeValueOrTTLRequired = 204
|
||||
|
||||
EcodeRaftInternal = 300
|
||||
EcodeLeaderElect = 301
|
||||
|
@ -45,6 +46,7 @@ func init() {
|
|||
errors[EcodePrevValueRequired] = "PrevValue is Required in POST form"
|
||||
errors[EcodeTTLNaN] = "The given TTL in POST form is not a number"
|
||||
errors[EcodeIndexNaN] = "The given index in POST form is not a number"
|
||||
errors[EcodeValueOrTTLRequired] = "Value or TTL is required in POST form"
|
||||
|
||||
// raft related errors
|
||||
errors[EcodeRaftInternal] = "Raft Internal Error"
|
||||
|
|
|
@ -111,24 +111,26 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
|
|||
|
||||
debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
|
||||
|
||||
value := req.FormValue("value")
|
||||
req.ParseForm()
|
||||
|
||||
expireTime, err := durationToExpireTime(req.FormValue("ttl"))
|
||||
value := req.Form.Get("value")
|
||||
|
||||
expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
|
||||
|
||||
if err != nil {
|
||||
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update")
|
||||
}
|
||||
|
||||
// TODO: update should give at least one option
|
||||
// update should give at least one option
|
||||
if value == "" && expireTime.Sub(store.Permanent) == 0 {
|
||||
return nil
|
||||
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update")
|
||||
}
|
||||
|
||||
prevValue := req.FormValue("prevValue")
|
||||
prevValue, valueOk := req.Form["prevValue"]
|
||||
|
||||
prevIndexStr := req.FormValue("prevIndex")
|
||||
prevIndexStr, indexOk := req.Form["prevIndex"]
|
||||
|
||||
if prevValue == "" && prevIndexStr == "" { // update without test
|
||||
if !valueOk && !indexOk { // update without test
|
||||
command := &UpdateCommand{
|
||||
Key: key,
|
||||
Value: value,
|
||||
|
@ -140,19 +142,21 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
|
|||
} else { // update with test
|
||||
var prevIndex uint64
|
||||
|
||||
if prevIndexStr != "" {
|
||||
prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64)
|
||||
}
|
||||
if indexOk {
|
||||
prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
|
||||
|
||||
// TODO: add error type
|
||||
if err != nil {
|
||||
return nil
|
||||
// bad previous index
|
||||
if err != nil {
|
||||
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update")
|
||||
}
|
||||
} else {
|
||||
prevIndex = 0
|
||||
}
|
||||
|
||||
command := &TestAndSetCommand{
|
||||
Key: key,
|
||||
Value: value,
|
||||
PrevValue: prevValue,
|
||||
PrevValue: prevValue[0],
|
||||
PrevIndex: prevIndex,
|
||||
}
|
||||
|
||||
|
@ -185,9 +189,8 @@ func dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) er
|
|||
|
||||
//--------------------------------------
|
||||
// State non-sensitive handlers
|
||||
// will not dispatch to leader
|
||||
// TODO: add sensitive version for these
|
||||
// command?
|
||||
// command with consistent option will
|
||||
// still dispatch to the leader
|
||||
//--------------------------------------
|
||||
|
||||
// Handler to return the current leader's raft address
|
||||
|
|
|
@ -16,6 +16,9 @@ const (
|
|||
Delete = "delete"
|
||||
TestAndSet = "testAndSet"
|
||||
Expire = "expire"
|
||||
)
|
||||
|
||||
const (
|
||||
UndefIndex = 0
|
||||
UndefTerm = 0
|
||||
)
|
||||
|
|
135
store/store.go
135
store/store.go
|
@ -97,67 +97,7 @@ func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term
|
|||
func (s *Store) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
|
||||
s.worldLock.Lock()
|
||||
defer s.worldLock.Unlock()
|
||||
|
||||
nodePath = path.Clean(path.Join("/", nodePath))
|
||||
|
||||
// make sure we can create the node
|
||||
_, err := s.internalGet(nodePath, index, term)
|
||||
|
||||
if err == nil { // key already exists
|
||||
s.Stats.Inc(SetFail)
|
||||
return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath)
|
||||
}
|
||||
|
||||
etcdError, _ := err.(etcdErr.Error)
|
||||
|
||||
if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking
|
||||
s.Stats.Inc(SetFail)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dir, _ := path.Split(nodePath)
|
||||
|
||||
// walk through the nodePath, create dirs and get the last directory node
|
||||
d, err := s.walk(dir, s.checkDir)
|
||||
|
||||
if err != nil {
|
||||
s.Stats.Inc(SetFail)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e := newEvent(Create, nodePath, s.Index, s.Term)
|
||||
|
||||
var n *Node
|
||||
|
||||
if len(value) != 0 { // create file
|
||||
e.Value = value
|
||||
|
||||
n = newFile(nodePath, value, s.Index, s.Term, d, "", expireTime)
|
||||
|
||||
} else { // create directory
|
||||
e.Dir = true
|
||||
|
||||
n = newDir(nodePath, s.Index, s.Term, d, "", expireTime)
|
||||
|
||||
}
|
||||
|
||||
err = d.Add(n)
|
||||
|
||||
if err != nil {
|
||||
s.Stats.Inc(SetFail)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Node with TTL
|
||||
if expireTime.Sub(Permanent) != 0 {
|
||||
n.Expire(s)
|
||||
e.Expiration = &n.ExpireTime
|
||||
e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
|
||||
}
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
s.Stats.Inc(SetSuccess)
|
||||
return e, nil
|
||||
return s.internalCreate(nodePath, value, expireTime, index, term, Create)
|
||||
}
|
||||
|
||||
// Update function updates the value/ttl of the node.
|
||||
|
@ -197,8 +137,10 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
|
|||
// update ttl
|
||||
n.UpdateTTL(expireTime, s)
|
||||
|
||||
e.Expiration = &n.ExpireTime
|
||||
e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
|
||||
if n.ExpireTime.Sub(Permanent) != 0 {
|
||||
e.Expiration = &n.ExpireTime
|
||||
e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
|
||||
}
|
||||
s.WatcherHub.notify(e)
|
||||
|
||||
s.Stats.Inc(UpdateSuccess)
|
||||
|
@ -212,6 +154,10 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
|
|||
s.worldLock.Lock()
|
||||
defer s.worldLock.Unlock()
|
||||
|
||||
if prevValue == "" && prevIndex == 0 { // try create
|
||||
return s.internalCreate(nodePath, value, expireTime, index, term, TestAndSet)
|
||||
}
|
||||
|
||||
n, err := s.internalGet(nodePath, index, term)
|
||||
|
||||
if err != nil {
|
||||
|
@ -316,6 +262,69 @@ func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string
|
|||
return curr, nil
|
||||
}
|
||||
|
||||
func (s *Store) internalCreate(nodePath string, value string, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
|
||||
nodePath = path.Clean(path.Join("/", nodePath))
|
||||
|
||||
// make sure we can create the node
|
||||
_, err := s.internalGet(nodePath, index, term)
|
||||
|
||||
if err == nil { // key already exists
|
||||
s.Stats.Inc(SetFail)
|
||||
return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath)
|
||||
}
|
||||
|
||||
etcdError, _ := err.(etcdErr.Error)
|
||||
|
||||
if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking
|
||||
s.Stats.Inc(SetFail)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dir, _ := path.Split(nodePath)
|
||||
|
||||
// walk through the nodePath, create dirs and get the last directory node
|
||||
d, err := s.walk(dir, s.checkDir)
|
||||
|
||||
if err != nil {
|
||||
s.Stats.Inc(SetFail)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e := newEvent(action, nodePath, s.Index, s.Term)
|
||||
|
||||
var n *Node
|
||||
|
||||
if len(value) != 0 { // create file
|
||||
e.Value = value
|
||||
|
||||
n = newFile(nodePath, value, s.Index, s.Term, d, "", expireTime)
|
||||
|
||||
} else { // create directory
|
||||
e.Dir = true
|
||||
|
||||
n = newDir(nodePath, s.Index, s.Term, d, "", expireTime)
|
||||
|
||||
}
|
||||
|
||||
err = d.Add(n)
|
||||
|
||||
if err != nil {
|
||||
s.Stats.Inc(SetFail)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Node with TTL
|
||||
if expireTime.Sub(Permanent) != 0 {
|
||||
n.Expire(s)
|
||||
e.Expiration = &n.ExpireTime
|
||||
e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
|
||||
}
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
s.Stats.Inc(SetSuccess)
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// InternalGet function get the node of the given nodePath.
|
||||
func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node, error) {
|
||||
nodePath = path.Clean(path.Join("/", nodePath))
|
||||
|
|
Loading…
Reference in New Issue