Merge pull request #238 from xiangli-cmu/0.2-handlers

0.2 handlers
release-0.4
Xiang Li 2013-10-16 10:56:39 -07:00
commit f5fa89c0e1
31 changed files with 386 additions and 518 deletions

View File

@ -45,7 +45,7 @@ func (r *Registry) Register(name string, peerVersion string, peerURL string, url
// Write data to store.
key := path.Join(RegistryKey, name)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion)
_, err := r.store.Create(key, value, false, false, store.Permanent, commitIndex, term)
_, err := r.store.Create(key, value, false, store.Permanent, commitIndex, term)
log.Debugf("Register: %s (%v)", name, err)
return err
}

View File

@ -102,10 +102,10 @@ func (s *Server) installV1() {
}
func (s *Server) installV2() {
s.handleFuncV2("/v2/keys/{key:.*}", v2.GetKeyHandler).Methods("GET")
s.handleFuncV2("/v2/keys/{key:.*}", v2.CreateKeyHandler).Methods("POST")
s.handleFuncV2("/v2/keys/{key:.*}", v2.UpdateKeyHandler).Methods("PUT")
s.handleFuncV2("/v2/keys/{key:.*}", v2.DeleteKeyHandler).Methods("DELETE")
s.handleFuncV2("/v2/keys/{key:.*}", v2.GetHandler).Methods("GET")
s.handleFuncV2("/v2/keys/{key:.*}", v2.PostHandler).Methods("POST")
s.handleFuncV2("/v2/keys/{key:.*}", v2.PutHandler).Methods("PUT")
s.handleFuncV2("/v2/keys/{key:.*}", v2.DeleteHandler).Methods("DELETE")
s.handleFunc("/v2/leader", s.GetLeaderHandler).Methods("GET")
s.handleFunc("/v2/machines", s.GetMachinesHandler).Methods("GET")
s.handleFunc("/v2/stats/self", s.GetStatsHandler).Methods("GET")
@ -254,7 +254,7 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
for i := 0; i < count; i++ {
go func() {
for j := 0; j < 10; j++ {
c := &store.UpdateCommand{
c := &store.SetCommand{
Key: "foo",
Value: "bar",
ExpireTime: time.Unix(0, 0),

View File

@ -31,7 +31,7 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
// If the "prevValue" is specified then test-and-set. Otherwise create a new key.
var c raft.Command
if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 {
c = &store.TestAndSetCommand{
c = &store.CompareAndSwapCommand{
Key: key,
Value: value,
PrevValue: prevValueArr[0],
@ -39,11 +39,10 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
}
} else {
c = &store.CreateCommand{
c = &store.SetCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
Force: true,
}
}

View File

@ -7,7 +7,7 @@ import (
"github.com/gorilla/mux"
)
func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]

View File

@ -13,7 +13,7 @@ import (
"github.com/gorilla/mux"
)
func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
var err error
var event *store.Event
@ -25,7 +25,7 @@ func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
leader := s.Leader()
hostname, _ := s.PeerURL(leader)
url := hostname + req.URL.Path
log.Debugf("Redirect to %s", url)
log.Debugf("Redirect consistent get to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
return nil
}
@ -36,8 +36,10 @@ func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
if req.FormValue("wait") == "true" { // watch
// Create a command to watch from a given index (default 0).
var sinceIndex uint64 = 0
if req.Method == "POST" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64)
waitIndex := req.FormValue("waitIndex")
if waitIndex != "" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("waitIndex")), 10, 64)
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
}

View File

@ -8,7 +8,7 @@ import (
"github.com/gorilla/mux"
)
func CreateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
@ -19,10 +19,10 @@ func CreateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error
}
c := &store.CreateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
IncrementalSuffix: (req.FormValue("incremental") == "true"),
Key: key,
Value: value,
ExpireTime: expireTime,
Unique: true,
}
return s.Dispatch(c, w, req)

109
server/v2/put_handler.go Normal file
View File

@ -0,0 +1,109 @@
package v2
import (
"net/http"
"strconv"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
)
func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
req.ParseForm()
value := req.Form.Get("value")
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
}
prevValue, valueOk := req.Form["prevValue"]
prevIndexStr, indexOk := req.Form["prevIndex"]
prevExist, existOk := req.Form["prevExist"]
var c raft.Command
// Set handler: create a new node or replace the old one.
if !valueOk && !indexOk && !existOk {
return SetHandler(w, req, s, key, value, expireTime)
}
// update with test
if existOk {
if prevExist[0] == "false" {
// Create command: create a new node. Fail, if a node already exists
// Ignore prevIndex and prevValue
return CreateHandler(w, req, s, key, value, expireTime)
}
if prevExist[0] == "true" && !indexOk && !valueOk {
return UpdateHandler(w, req, s, key, value, expireTime)
}
}
var prevIndex uint64
if indexOk {
prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", store.UndefIndex, store.UndefTerm)
}
} else {
prevIndex = 0
}
if valueOk {
if prevValue[0] == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", store.UndefIndex, store.UndefTerm)
}
}
c = &store.CompareAndSwapCommand{
Key: key,
Value: value,
PrevValue: prevValue[0],
PrevIndex: prevIndex,
}
return s.Dispatch(c, w, req)
}
func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
c := &store.SetCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
return s.Dispatch(c, w, req)
}
func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
c := &store.CreateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
return s.Dispatch(c, w, req)
}
func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
// Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
}
c := &store.UpdateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
return s.Dispatch(c, w, req)
}

View File

@ -1,64 +0,0 @@
package v2
import (
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
)
func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
req.ParseForm()
value := req.Form.Get("value")
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
}
// Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
}
prevValue, valueOk := req.Form["prevValue"]
prevIndexStr, indexOk := req.Form["prevIndex"]
var c raft.Command
if !valueOk && !indexOk { // update without test
c = &store.UpdateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
} else { // update with test
var prevIndex uint64
if indexOk {
prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
}
} else {
prevIndex = 0
}
c = &store.TestAndSetCommand{
Key: key,
Value: value,
PrevValue: prevValue[0],
PrevIndex: prevIndex,
}
}
return s.Dispatch(c, w, req)
}

View File

@ -8,11 +8,11 @@ import (
)
func init() {
raft.RegisterCommand(&TestAndSetCommand{})
raft.RegisterCommand(&CompareAndSwapCommand{})
}
// The TestAndSetCommand performs a conditional update on a key in the store.
type TestAndSetCommand struct {
// The CompareAndSwap performs a conditional update on a key in the store.
type CompareAndSwapCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
@ -21,15 +21,15 @@ type TestAndSetCommand struct {
}
// The name of the testAndSet command in the log
func (c *TestAndSetCommand) CommandName() string {
return "etcd:testAndSet"
func (c *CompareAndSwapCommand) CommandName() string {
return "etcd:compareAndSwap"
}
// Set the key-value pair if the current value of the key equals to the given prevValue
func (c *TestAndSetCommand) Apply(server raft.Server) (interface{}, error) {
func (c *CompareAndSwapCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(Store)
e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,
e, err := s.CompareAndSwap(c.Key, c.PrevValue, c.PrevIndex,
c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil {

View File

@ -12,11 +12,10 @@ func init() {
// Create command
type CreateCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
IncrementalSuffix bool `json:"incrementalSuffix"`
Force bool `json:"force"`
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
Unique bool `json:"unique"`
}
// The name of the create command in the log
@ -28,7 +27,7 @@ func (c *CreateCommand) CommandName() string {
func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(Store)
e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term())
e, err := s.Create(c.Key, c.Value, c.Unique, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil {
log.Debug(err)

View File

@ -5,12 +5,13 @@ import (
)
const (
Get = "get"
Create = "create"
Update = "update"
Delete = "delete"
TestAndSet = "testAndSet"
Expire = "expire"
Get = "get"
Create = "create"
Set = "set"
Update = "update"
Delete = "delete"
CompareAndSwap = "compareAndSwap"
Expire = "expire"
)
const (
@ -54,7 +55,7 @@ func (event *Event) Response() interface{} {
Expiration: event.Expiration,
}
if response.Action == Create || response.Action == Update {
if response.Action == Create || response.Action == Set {
response.Action = "set"
if response.PrevValue == "" {
response.NewKey = true

38
store/set_command.go Normal file
View File

@ -0,0 +1,38 @@
package store
import (
"github.com/coreos/etcd/log"
"github.com/coreos/go-raft"
"time"
)
func init() {
raft.RegisterCommand(&SetCommand{})
}
// Create command
type SetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
}
// The name of the create command in the log
func (c *SetCommand) CommandName() string {
return "etcd:set"
}
// Create node
func (c *SetCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(Store)
// create a new node or replace the old node.
e, err := s.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil {
log.Debug(err)
return nil, err
}
return e, nil
}

View File

@ -6,17 +6,19 @@ import (
)
const (
SetSuccess = 100
SetFail = 101
DeleteSuccess = 102
DeleteFail = 103
UpdateSuccess = 104
UpdateFail = 105
TestAndSetSuccess = 106
TestAndSetFail = 107
GetSuccess = 110
GetFail = 111
ExpireCount = 112
SetSuccess = iota
SetFail
DeleteSuccess
DeleteFail
CreateSuccess
CreateFail
UpdateSuccess
UpdateFail
CompareAndSwapSuccess
CompareAndSwapFail
GetSuccess
GetFail
ExpireCount
)
type Stats struct {
@ -37,10 +39,15 @@ type Stats struct {
UpdateSuccess uint64 `json:"updateSuccess"`
UpdateFail uint64 `json:"updateFail"`
// Number of create requests
CreateSuccess uint64 `json:"createSuccess"`
CreateFail uint64 `json:createFail`
// Number of testAndSet requests
TestAndSetSuccess uint64 `json:"testAndSetSuccess"`
TestAndSetFail uint64 `json:"testAndSetFail"`
ExpireCount uint64 `json:"expireCount"`
CompareAndSwapSuccess uint64 `json:"compareAndSwapSuccess"`
CompareAndSwapFail uint64 `json:"compareAndSwapFail"`
ExpireCount uint64 `json:"expireCount"`
Watchers uint64 `json:"watchers"`
}
@ -52,8 +59,8 @@ func newStats() *Stats {
func (s *Stats) clone() *Stats {
return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail,
s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail,
s.TestAndSetSuccess, s.TestAndSetFail, s.Watchers, s.ExpireCount}
s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, s.CreateSuccess,
s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail, s.Watchers, s.ExpireCount}
}
// Status() return the statistics info of etcd storage its recent start
@ -69,7 +76,7 @@ func (s *Stats) TotalReads() uint64 {
func (s *Stats) TotalWrites() uint64 {
return s.SetSuccess + s.SetFail +
s.DeleteSuccess + s.DeleteFail +
s.TestAndSetSuccess + s.TestAndSetFail +
s.CompareAndSwapSuccess + s.CompareAndSwapFail +
s.UpdateSuccess + s.UpdateFail
}
@ -79,6 +86,10 @@ func (s *Stats) Inc(field int) {
atomic.AddUint64(&s.SetSuccess, 1)
case SetFail:
atomic.AddUint64(&s.SetFail, 1)
case CreateSuccess:
atomic.AddUint64(&s.CreateSuccess, 1)
case CreateFail:
atomic.AddUint64(&s.CreateFail, 1)
case DeleteSuccess:
atomic.AddUint64(&s.DeleteSuccess, 1)
case DeleteFail:
@ -91,10 +102,10 @@ func (s *Stats) Inc(field int) {
atomic.AddUint64(&s.UpdateSuccess, 1)
case UpdateFail:
atomic.AddUint64(&s.UpdateFail, 1)
case TestAndSetSuccess:
atomic.AddUint64(&s.TestAndSetSuccess, 1)
case TestAndSetFail:
atomic.AddUint64(&s.TestAndSetFail, 1)
case CompareAndSwapSuccess:
atomic.AddUint64(&s.CompareAndSwapSuccess, 1)
case CompareAndSwapFail:
atomic.AddUint64(&s.CompareAndSwapFail, 1)
case ExpireCount:
atomic.AddUint64(&s.ExpireCount, 1)
}

View File

@ -11,16 +11,16 @@ func TestBasicStats(t *testing.T) {
keys := GenKeys(rand.Intn(100), 5)
var i uint64
var GetSuccess, GetFail, SetSuccess, SetFail, DeleteSuccess, DeleteFail uint64
var UpdateSuccess, UpdateFail, TestAndSetSuccess, TestAndSetFail, watcher_number uint64
var GetSuccess, GetFail, CreateSuccess, CreateFail, DeleteSuccess, DeleteFail uint64
var UpdateSuccess, UpdateFail, CompareAndSwapSuccess, CompareAndSwapFail, watcher_number uint64
for _, k := range keys {
i++
_, err := s.Create(k, "bar", false, false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
if err != nil {
SetFail++
CreateFail++
} else {
SetSuccess++
CreateSuccess++
}
}
@ -58,11 +58,11 @@ func TestBasicStats(t *testing.T) {
for _, k := range keys {
i++
_, err := s.TestAndSet(k, "foo", 0, "bar", Permanent, i, 1)
_, err := s.CompareAndSwap(k, "foo", 0, "bar", Permanent, i, 1)
if err != nil {
TestAndSetFail++
CompareAndSwapFail++
} else {
TestAndSetSuccess++
CompareAndSwapSuccess++
}
}
@ -108,12 +108,12 @@ func TestBasicStats(t *testing.T) {
t.Fatalf("GetFail [%d] != Stats.GetFail [%d]", GetFail, s.Stats.GetFail)
}
if SetSuccess != s.Stats.SetSuccess {
t.Fatalf("SetSuccess [%d] != Stats.SetSuccess [%d]", SetSuccess, s.Stats.SetSuccess)
if CreateSuccess != s.Stats.CreateSuccess {
t.Fatalf("CreateSuccess [%d] != Stats.CreateSuccess [%d]", CreateSuccess, s.Stats.CreateSuccess)
}
if SetFail != s.Stats.SetFail {
t.Fatalf("SetFail [%d] != Stats.SetFail [%d]", SetFail, s.Stats.SetFail)
if CreateFail != s.Stats.CreateFail {
t.Fatalf("CreateFail [%d] != Stats.CreateFail [%d]", CreateFail, s.Stats.CreateFail)
}
if DeleteSuccess != s.Stats.DeleteSuccess {
@ -132,31 +132,31 @@ func TestBasicStats(t *testing.T) {
t.Fatalf("UpdateFail [%d] != Stats.UpdateFail [%d]", UpdateFail, s.Stats.UpdateFail)
}
if TestAndSetSuccess != s.Stats.TestAndSetSuccess {
t.Fatalf("TestAndSetSuccess [%d] != Stats.TestAndSetSuccess [%d]", TestAndSetSuccess, s.Stats.TestAndSetSuccess)
if CompareAndSwapSuccess != s.Stats.CompareAndSwapSuccess {
t.Fatalf("TestAndSetSuccess [%d] != Stats.CompareAndSwapSuccess [%d]", CompareAndSwapSuccess, s.Stats.CompareAndSwapSuccess)
}
if TestAndSetFail != s.Stats.TestAndSetFail {
t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail)
if CompareAndSwapFail != s.Stats.CompareAndSwapFail {
t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", CompareAndSwapFail, s.Stats.CompareAndSwapFail)
}
s = newStore()
SetSuccess = 0
SetFail = 0
CreateSuccess = 0
CreateFail = 0
for _, k := range keys {
i++
_, err := s.Create(k, "bar", false, false, time.Now().Add(time.Second*3), i, 1)
_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*3), i, 1)
if err != nil {
SetFail++
CreateFail++
} else {
SetSuccess++
CreateSuccess++
}
}
time.Sleep(6 * time.Second)
ExpireCount := SetSuccess
ExpireCount := CreateSuccess
if ExpireCount != s.Stats.ExpireCount {
t.Fatalf("ExpireCount [%d] != Stats.ExpireCount [%d]", ExpireCount, s.Stats.ExpireCount)

View File

@ -15,10 +15,11 @@ import (
type Store interface {
Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error)
Create(nodePath string, value string, incrementalSuffix bool, force bool,
expireTime time.Time, index uint64, term uint64) (*Event, error)
Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error)
TestAndSet(nodePath string, prevValue string, prevIndex uint64,
Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time,
index uint64, term uint64) (*Event, error)
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error)
@ -107,56 +108,41 @@ func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term
// Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
// If the node has already existed, create will fail.
// If any node on the path is a file, create will fail.
func (s *store) Create(nodePath string, value string, incrementalSuffix bool, force bool,
func (s *store) Create(nodePath string, value string, unique bool,
expireTime time.Time, index uint64, term uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock()
defer s.worldLock.Unlock()
return s.internalCreate(nodePath, value, incrementalSuffix, force, expireTime, index, term, Create)
e, err := s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create)
if err == nil {
s.Stats.Inc(CreateSuccess)
} else {
s.Stats.Inc(CreateFail)
}
return e, err
}
// Update function updates the value/ttl of the node.
// If the node is a file, the value and the ttl can be updated.
// If the node is a directory, only the ttl can be updated.
func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
s.worldLock.Lock()
defer s.worldLock.Unlock()
// Set function creates or replace the Node at nodePath.
func (s *store) Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath))
n, err := s.internalGet(nodePath, index, term)
s.worldLock.Lock()
defer s.worldLock.Unlock()
e, err := s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set)
if err != nil { // if the node does not exist, return error
s.Stats.Inc(UpdateFail)
return nil, err
if err == nil {
s.Stats.Inc(SetSuccess)
} else {
s.Stats.Inc(SetFail)
}
e := newEvent(Update, nodePath, s.Index, s.Term)
if len(newValue) != 0 {
if n.IsDir() {
// if the node is a directory, we cannot update value
s.Stats.Inc(UpdateFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
}
e.PrevValue = n.Value
n.Write(newValue, index, term)
}
// update ttl
n.UpdateTTL(expireTime)
e.Expiration, e.TTL = n.ExpirationAndTTL()
s.WatcherHub.notify(e)
s.Stats.Inc(UpdateSuccess)
return e, nil
return e, err
}
func (s *store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath))
@ -164,26 +150,22 @@ func (s *store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
s.worldLock.Lock()
defer s.worldLock.Unlock()
if prevValue == "" && prevIndex == 0 { // try create
return s.internalCreate(nodePath, value, false, false, expireTime, index, term, TestAndSet)
}
n, err := s.internalGet(nodePath, index, term)
if err != nil {
s.Stats.Inc(TestAndSetFail)
s.Stats.Inc(CompareAndSwapFail)
return nil, err
}
if n.IsDir() { // can only test and set file
s.Stats.Inc(TestAndSetFail)
s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
}
// If both of the prevValue and prevIndex are given, we will test both of them.
// Command will be executed, only if both of the tests are successful.
if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
e := newEvent(TestAndSet, nodePath, index, term)
e := newEvent(CompareAndSwap, nodePath, index, term)
e.PrevValue = n.Value
// if test succeed, write the value
@ -194,12 +176,12 @@ func (s *store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
e.Expiration, e.TTL = n.ExpirationAndTTL()
s.WatcherHub.notify(e)
s.Stats.Inc(TestAndSetSuccess)
s.Stats.Inc(CompareAndSwapSuccess)
return e, nil
}
cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
s.Stats.Inc(TestAndSetFail)
s.Stats.Inc(CompareAndSwapFail)
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, index, term)
}
@ -293,13 +275,53 @@ func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string
return curr, nil
}
func (s *store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool,
// Update function updates the value/ttl of the node.
// If the node is a file, the value and the ttl can be updated.
// If the node is a directory, only the ttl can be updated.
func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
s.worldLock.Lock()
defer s.worldLock.Unlock()
nodePath = path.Clean(path.Join("/", nodePath))
n, err := s.internalGet(nodePath, index, term)
if err != nil { // if the node does not exist, return error
s.Stats.Inc(UpdateFail)
return nil, err
}
e := newEvent(Update, nodePath, s.Index, s.Term)
if len(newValue) != 0 {
if n.IsDir() {
// if the node is a directory, we cannot update value
s.Stats.Inc(UpdateFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
}
e.PrevValue = n.Value
n.Write(newValue, index, term)
}
// update ttl
n.UpdateTTL(expireTime)
e.Expiration, e.TTL = n.ExpirationAndTTL()
s.WatcherHub.notify(e)
s.Stats.Inc(UpdateSuccess)
return e, nil
}
func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
s.Index, s.Term = index, term
if incrementalSuffix { // append unique incremental suffix to the node path
nodePath += "_" + strconv.FormatUint(index, 10)
if unique { // append unique item under the node path
nodePath += "/" + strconv.FormatUint(index, 10)
}
nodePath = path.Clean(path.Join("/", nodePath))
@ -321,7 +343,7 @@ func (s *store) internalCreate(nodePath string, value string, incrementalSuffix
// force will try to replace a existing file
if n != nil {
if force {
if replace {
if n.IsDir() {
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
}
@ -345,12 +367,8 @@ func (s *store) internalCreate(nodePath string, value string, incrementalSuffix
}
err = d.Add(n)
if err != nil {
s.Stats.Inc(SetFail)
return nil, err
}
// we are sure d is a directory and does not have the children with name n.Name
d.Add(n)
// Node with TTL
if expireTime.Sub(Permanent) != 0 {
@ -359,7 +377,6 @@ func (s *store) internalCreate(nodePath string, value string, incrementalSuffix
}
s.WatcherHub.notify(e)
s.Stats.Inc(SetSuccess)
return e, nil
}

View File

@ -10,10 +10,10 @@ import (
func TestCreateAndGet(t *testing.T) {
s := newStore()
s.Create("/foobar", "bar", false, false, Permanent, 1, 1)
s.Create("/foobar", "bar", false, Permanent, 1, 1)
// already exist, create should fail
_, err := s.Create("/foobar", "bar", false, false, Permanent, 1, 1)
_, err := s.Create("/foobar", "bar", false, Permanent, 1, 1)
if err == nil {
t.Fatal("Create should fail")
@ -21,10 +21,10 @@ func TestCreateAndGet(t *testing.T) {
s.Delete("/foobar", true, 1, 1)
s.Create("/foobar/foo", "bar", false, false, Permanent, 1, 1)
s.Create("/foobar/foo", "bar", false, Permanent, 1, 1)
// already exist, create should fail
_, err = s.Create("/foobar", "bar", false, false, Permanent, 1, 1)
_, err = s.Create("/foobar", "bar", false, Permanent, 1, 1)
if err == nil {
t.Fatal("Create should fail")
@ -38,14 +38,14 @@ func TestCreateAndGet(t *testing.T) {
createAndGet(s, "/foo/foo/bar", t)
// meet file, create should fail
_, err = s.Create("/foo/bar/bar", "bar", false, false, Permanent, 2, 1)
_, err = s.Create("/foo/bar/bar", "bar", false, Permanent, 2, 1)
if err == nil {
t.Fatal("Create should fail")
}
// create a directory
_, err = s.Create("/fooDir", "", false, false, Permanent, 3, 1)
_, err = s.Create("/fooDir", "", false, Permanent, 3, 1)
if err != nil {
t.Fatal("Cannot create /fooDir")
@ -58,7 +58,7 @@ func TestCreateAndGet(t *testing.T) {
}
// create a file under directory
_, err = s.Create("/fooDir/bar", "bar", false, false, Permanent, 4, 1)
_, err = s.Create("/fooDir/bar", "bar", false, Permanent, 4, 1)
if err != nil {
t.Fatal("Cannot create /fooDir/bar = bar")
@ -68,7 +68,7 @@ func TestCreateAndGet(t *testing.T) {
func TestUpdateFile(t *testing.T) {
s := newStore()
_, err := s.Create("/foo/bar", "bar", false, false, Permanent, 1, 1)
_, err := s.Create("/foo/bar", "bar", false, Permanent, 1, 1)
if err != nil {
t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error())
@ -91,24 +91,24 @@ func TestUpdateFile(t *testing.T) {
}
// create a directory, update its ttl, to see if it will be deleted
_, err = s.Create("/foo/foo", "", false, false, Permanent, 3, 1)
_, err = s.Create("/foo/foo", "", false, Permanent, 3, 1)
if err != nil {
t.Fatalf("cannot create dir [%s] [%s]", "/foo/foo", err.Error())
}
_, err = s.Create("/foo/foo/foo1", "bar1", false, false, Permanent, 4, 1)
_, err = s.Create("/foo/foo/foo1", "bar1", false, Permanent, 4, 1)
if err != nil {
t.Fatal("cannot create [%s]", err.Error())
}
_, err = s.Create("/foo/foo/foo2", "", false, false, Permanent, 5, 1)
_, err = s.Create("/foo/foo/foo2", "", false, Permanent, 5, 1)
if err != nil {
t.Fatal("cannot create [%s]", err.Error())
}
_, err = s.Create("/foo/foo/foo2/boo", "boo1", false, false, Permanent, 6, 1)
_, err = s.Create("/foo/foo/foo2/boo", "boo1", false, Permanent, 6, 1)
if err != nil {
t.Fatal("cannot create [%s]", err.Error())
}
@ -165,11 +165,11 @@ func TestListDirectory(t *testing.T) {
// create dir /foo
// set key-value /foo/foo=bar
s.Create("/foo/foo", "bar", false, false, Permanent, 1, 1)
s.Create("/foo/foo", "bar", false, Permanent, 1, 1)
// create dir /foo/fooDir
// set key-value /foo/fooDir/foo=bar
s.Create("/foo/fooDir/foo", "bar", false, false, Permanent, 2, 1)
s.Create("/foo/fooDir/foo", "bar", false, Permanent, 2, 1)
e, err := s.Get("/foo", true, false, 2, 1)
@ -196,7 +196,7 @@ func TestListDirectory(t *testing.T) {
// create dir /foo/_hidden
// set key-value /foo/_hidden/foo -> bar
s.Create("/foo/_hidden/foo", "bar", false, false, Permanent, 3, 1)
s.Create("/foo/_hidden/foo", "bar", false, Permanent, 3, 1)
e, _ = s.Get("/foo", false, false, 2, 1)
@ -208,7 +208,7 @@ func TestListDirectory(t *testing.T) {
func TestRemove(t *testing.T) {
s := newStore()
s.Create("/foo", "bar", false, false, Permanent, 1, 1)
s.Create("/foo", "bar", false, Permanent, 1, 1)
_, err := s.Delete("/foo", false, 1, 1)
if err != nil {
@ -221,9 +221,9 @@ func TestRemove(t *testing.T) {
t.Fatalf("can get the node after deletion")
}
s.Create("/foo/bar", "bar", false, false, Permanent, 1, 1)
s.Create("/foo/car", "car", false, false, Permanent, 1, 1)
s.Create("/foo/dar/dar", "dar", false, false, Permanent, 1, 1)
s.Create("/foo/bar", "bar", false, Permanent, 1, 1)
s.Create("/foo/car", "car", false, Permanent, 1, 1)
s.Create("/foo/dar/dar", "dar", false, Permanent, 1, 1)
_, err = s.Delete("/foo", false, 1, 1)
@ -249,7 +249,7 @@ func TestExpire(t *testing.T) {
expire := time.Now().Add(time.Second)
s.Create("/foo", "bar", false, false, expire, 1, 1)
s.Create("/foo", "bar", false, expire, 1, 1)
_, err := s.Get("/foo", false, false, 1, 1)
@ -267,7 +267,7 @@ func TestExpire(t *testing.T) {
// test if we can reach the node before expiration
expire = time.Now().Add(time.Second)
s.Create("/foo", "bar", false, false, expire, 1, 1)
s.Create("/foo", "bar", false, expire, 1, 1)
time.Sleep(time.Millisecond * 50)
_, err = s.Get("/foo", false, false, 1, 1)
@ -278,7 +278,7 @@ func TestExpire(t *testing.T) {
expire = time.Now().Add(time.Second)
s.Create("/foo", "bar", false, false, expire, 1, 1)
s.Create("/foo", "bar", false, expire, 1, 1)
_, err = s.Delete("/foo", false, 1, 1)
if err != nil {
@ -286,18 +286,18 @@ func TestExpire(t *testing.T) {
}
}
func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
func TestCompareAndSwap(t *testing.T) { // TODO prevValue == nil ?
s := newStore()
s.Create("/foo", "bar", false, false, Permanent, 1, 1)
s.Create("/foo", "bar", false, Permanent, 1, 1)
// test on wrong previous value
_, err := s.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1)
_, err := s.CompareAndSwap("/foo", "barbar", 0, "car", Permanent, 2, 1)
if err == nil {
t.Fatal("test and set should fail barbar != bar")
}
// test on value
e, err := s.TestAndSet("/foo", "bar", 0, "car", Permanent, 3, 1)
e, err := s.CompareAndSwap("/foo", "bar", 0, "car", Permanent, 3, 1)
if err != nil {
t.Fatal("test and set should succeed bar == bar")
@ -308,7 +308,7 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
}
// test on index
e, err = s.TestAndSet("/foo", "", 3, "bar", Permanent, 4, 1)
e, err = s.CompareAndSwap("/foo", "", 3, "bar", Permanent, 4, 1)
if err != nil {
t.Fatal("test and set should succeed index 3 == 3")
@ -323,7 +323,7 @@ func TestWatch(t *testing.T) {
s := newStore()
// watch at a deeper path
c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1)
s.Create("/foo/foo/foo", "bar", false, false, Permanent, 1, 1)
s.Create("/foo/foo/foo", "bar", false, Permanent, 1, 1)
e := nonblockingRetrive(c)
if e.Key != "/foo/foo/foo" || e.Action != Create {
@ -338,10 +338,10 @@ func TestWatch(t *testing.T) {
}
c, _ = s.Watch("/foo/foo/foo", false, 0, 2, 1)
s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1)
s.CompareAndSwap("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/foo" || e.Action != TestAndSet {
t.Fatal("watch for TestAndSet node fails")
if e.Key != "/foo/foo/foo" || e.Action != CompareAndSwap {
t.Fatal("watch for CompareAndSwap node fails")
}
c, _ = s.Watch("/foo/foo/foo", false, 0, 3, 1)
@ -353,7 +353,7 @@ func TestWatch(t *testing.T) {
// watch at a prefix
c, _ = s.Watch("/foo", true, 0, 4, 1)
s.Create("/foo/foo/boo", "bar", false, false, Permanent, 5, 1)
s.Create("/foo/foo/boo", "bar", false, Permanent, 5, 1)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/boo" || e.Action != Create {
t.Fatal("watch for Create subdirectory fails")
@ -367,10 +367,10 @@ func TestWatch(t *testing.T) {
}
c, _ = s.Watch("/foo", true, 0, 6, 1)
s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1)
s.CompareAndSwap("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/boo" || e.Action != TestAndSet {
t.Fatal("watch for TestAndSet subdirectory fails")
if e.Key != "/foo/foo/boo" || e.Action != CompareAndSwap {
t.Fatal("watch for CompareAndSwap subdirectory fails")
}
c, _ = s.Watch("/foo", true, 0, 7, 1)
@ -381,7 +381,7 @@ func TestWatch(t *testing.T) {
}
// watch expire
s.Create("/foo/foo/boo", "foo", false, false, time.Now().Add(time.Second*1), 9, 1)
s.Create("/foo/foo/boo", "foo", false, time.Now().Add(time.Second*1), 9, 1)
c, _ = s.Watch("/foo", true, 0, 9, 1)
time.Sleep(time.Second * 2)
e = nonblockingRetrive(c)
@ -389,7 +389,7 @@ func TestWatch(t *testing.T) {
t.Fatal("watch for Expiration of Create() subdirectory fails ", e)
}
s.Create("/foo/foo/boo", "foo", false, false, Permanent, 10, 1)
s.Create("/foo/foo/boo", "foo", false, Permanent, 10, 1)
s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1)
c, _ = s.Watch("/foo", true, 0, 11, 1)
time.Sleep(time.Second * 2)
@ -398,13 +398,13 @@ func TestWatch(t *testing.T) {
t.Fatal("watch for Expiration of Update() subdirectory fails ", e)
}
s.Create("/foo/foo/boo", "foo", false, false, Permanent, 12, 1)
s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1)
s.Create("/foo/foo/boo", "foo", false, Permanent, 12, 1)
s.CompareAndSwap("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1)
c, _ = s.Watch("/foo", true, 0, 13, 1)
time.Sleep(time.Second * 2)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 {
t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e)
t.Fatal("watch for Expiration of CompareAndSwap() subdirectory fails ", e)
}
}
@ -416,7 +416,7 @@ func TestSort(t *testing.T) {
i := uint64(1)
for _, k := range keys {
_, err := s.Create(k, "bar", false, false, Permanent, i, 1)
_, err := s.Create(k, "bar", false, Permanent, i, 1)
if err != nil {
panic(err)
} else {
@ -454,7 +454,7 @@ func TestSaveAndRecover(t *testing.T) {
i := uint64(1)
for _, k := range keys {
_, err := s.Create(k, "bar", false, false, Permanent, i, 1)
_, err := s.Create(k, "bar", false, Permanent, i, 1)
if err != nil {
panic(err)
} else {
@ -466,7 +466,7 @@ func TestSaveAndRecover(t *testing.T) {
// test if we can reach the node before expiration
expire := time.Now().Add(time.Second)
s.Create("/foo/foo", "bar", false, false, expire, 1, 1)
s.Create("/foo/foo", "bar", false, expire, 1, 1)
b, err := s.Save()
cloneFs := newStore()
@ -522,7 +522,7 @@ func GenKeys(num int, depth int) []string {
}
func createAndGet(s *store, path string, t *testing.T) {
_, err := s.Create(path, "bar", false, false, Permanent, 1, 1)
_, err := s.Create(path, "bar", false, Permanent, 1, 1)
if err != nil {
t.Fatalf("cannot create %s=bar [%s]", path, err.Error())

View File

@ -1,17 +1,16 @@
package store
import (
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/go-raft"
"time"
)
func init() {
raft.RegisterCommand(&UpdateCommand{})
}
// The UpdateCommand updates the value of a key in the Store.
// Update command
type UpdateCommand struct {
Key string `json:"key"`
Value string `json:"value"`
@ -23,7 +22,7 @@ func (c *UpdateCommand) CommandName() string {
return "etcd:update"
}
// Update node
// Create node
func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(Store)

View File

@ -2,9 +2,9 @@ package etcd
import (
"fmt"
"testing"
"net/url"
"net"
"net/url"
"testing"
)
// To pass this test, we need to create a cluster of 3 machines
@ -19,7 +19,7 @@ func TestSync(t *testing.T) {
t.Fatal("cannot sync machines")
}
for _, m := range(c.GetCluster()) {
for _, m := range c.GetCluster() {
u, err := url.Parse(m)
if err != nil {
t.Fatal(err)
@ -27,7 +27,7 @@ func TestSync(t *testing.T) {
if u.Scheme != "http" {
t.Fatal("scheme must be http")
}
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
t.Fatal(err)

View File

@ -1,4 +1,3 @@
package main
import (

View File

@ -1,4 +1,5 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -42,7 +43,6 @@ func (logger *Logger) Logf(priority Priority, format string, v ...interface{}) {
logger.Log(priority, fmt.Sprintf(format, v...))
}
func (logger *Logger) Emergency(v ...interface{}) {
logger.Log(PriEmerg, v...)
}
@ -99,7 +99,6 @@ func (logger *Logger) Debugf(format string, v ...interface{}) {
logger.Log(PriDebug, fmt.Sprintf(format, v...))
}
func Emergency(v ...interface{}) {
defaultLogger.Log(PriEmerg, v...)
}
@ -158,57 +157,56 @@ func Debugf(format string, v ...interface{}) {
// Standard library log functions
func (logger *Logger)Fatalln (v ...interface{}) {
func (logger *Logger) Fatalln(v ...interface{}) {
logger.Log(PriCrit, v...)
os.Exit(1)
}
func (logger *Logger)Fatalf (format string, v ...interface{}) {
func (logger *Logger) Fatalf(format string, v ...interface{}) {
logger.Logf(PriCrit, format, v...)
os.Exit(1)
}
func (logger *Logger)Panicln (v ...interface{}) {
func (logger *Logger) Panicln(v ...interface{}) {
s := fmt.Sprint(v...)
logger.Log(PriErr, s)
panic(s)
}
func (logger *Logger)Panicf (format string, v ...interface{}) {
func (logger *Logger) Panicf(format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
logger.Log(PriErr, s)
panic(s)
}
func (logger *Logger)Println (v ...interface{}) {
func (logger *Logger) Println(v ...interface{}) {
logger.Log(PriInfo, v...)
}
func (logger *Logger)Printf (format string, v ...interface{}) {
func (logger *Logger) Printf(format string, v ...interface{}) {
logger.Logf(PriInfo, format, v...)
}
func Fatalln (v ...interface{}) {
func Fatalln(v ...interface{}) {
defaultLogger.Log(PriCrit, v...)
os.Exit(1)
}
func Fatalf (format string, v ...interface{}) {
func Fatalf(format string, v ...interface{}) {
defaultLogger.Logf(PriCrit, format, v...)
os.Exit(1)
}
func Panicln (v ...interface{}) {
func Panicln(v ...interface{}) {
s := fmt.Sprint(v...)
defaultLogger.Log(PriErr, s)
panic(s)
}
func Panicf (format string, v ...interface{}) {
func Panicf(format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
defaultLogger.Log(PriErr, s)
panic(s)
}
func Println (v ...interface{}) {
func Println(v ...interface{}) {
defaultLogger.Log(PriInfo, v...)
}
func Printf (format string, v ...interface{}) {
func Printf(format string, v ...interface{}) {
defaultLogger.Logf(PriInfo, format, v...)
}

View File

@ -1,4 +1,5 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,4 +1,5 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,4 +1,5 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,4 +1,5 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -24,7 +24,7 @@ func Files() []*os.File {
files := []*os.File(nil)
for fd := listenFdsStart; fd < listenFdsStart+nfds; fd++ {
syscall.CloseOnExec(fd)
files = append(files, os.NewFile(uintptr(fd), "LISTEN_FD_" + strconv.Itoa(fd)))
files = append(files, os.NewFile(uintptr(fd), "LISTEN_FD_"+strconv.Itoa(fd)))
}
return files
}

View File

@ -1,30 +0,0 @@
package web
import (
"code.google.com/p/go.net/websocket"
)
type connection struct {
// The websocket connection.
ws *websocket.Conn
// Buffered channel of outbound messages.
send chan string
}
func (c *connection) writer() {
for message := range c.send {
err := websocket.Message.Send(c.ws, message)
if err != nil {
break
}
}
c.ws.Close()
}
func wsHandler(ws *websocket.Conn) {
c := &connection{send: make(chan string, 256), ws: ws}
h.register <- c
defer func() { h.unregister <- c }()
c.writer()
}

View File

@ -1,28 +0,0 @@
#!/bin/sh
# this file is copied from doozerd.
set -e
munge() {
printf %s "$1" | tr . _ | tr -d -c '[:alnum:]_'
}
quote() {
sed 's/\\/\\\\/g' | sed 's/"/\\"/g' | sed 's/$/\\n/' | tr -d '\n'
}
pkg_path=$1 ; shift
file=$1 ; shift
pkg=`basename $pkg_path`
printf 'package %s\n' "$pkg"
printf '\n'
printf '// This file was generated from %s.\n' "$file"
printf '\n'
printf 'var '
munge "`basename $file`"
printf ' string = "'
quote
printf '"\n'

View File

@ -1,61 +0,0 @@
package web
type hub struct {
// status
open bool
// Registered connections.
connections map[*connection]bool
// Inbound messages from the connections.
broadcast chan string
// Register requests from the connections.
register chan *connection
// Unregister requests from connections.
unregister chan *connection
}
var h = hub{
open: false,
broadcast: make(chan string),
register: make(chan *connection),
unregister: make(chan *connection),
connections: make(map[*connection]bool),
}
func Hub() *hub {
return &h
}
func HubOpen() bool {
return h.open
}
func (h *hub) run() {
h.open = true
for {
select {
case c := <-h.register:
h.connections[c] = true
case c := <-h.unregister:
delete(h.connections, c)
close(c.send)
case m := <-h.broadcast:
for c := range h.connections {
select {
case c.send <- m:
default:
delete(h.connections, c)
close(c.send)
go c.ws.Close()
}
}
}
}
}
func (h *hub) Send(msg string) {
h.broadcast <- msg
}

View File

@ -1,5 +0,0 @@
package web
// This file was generated from index.html.
var index_html string = "<html>\n<head>\n<title>etcd Web Interface</title>\n<script type=\"text/javascript\" src=\"//ajax.googleapis.com/ajax/libs/jquery/1.10.1/jquery.min.js\"></script>\n<script type=\"text/javascript\">\n $(function() {\n\n var conn;\n var content = $(\"#content\");\n\n function update(response) {\n // if set\n if (response.action == \"SET\") {\n\n if (response.expiration > \"1970\") {\n t = response.key + \"=\" + response.value\n + \" \" + response.expiration\n } else {\n t = response.key + \"=\" + response.value\n }\n\n id = response.key.replace(new RegExp(\"/\", 'g'), \"\\\\/\");\n\n if ($(\"#store_\" + id).length == 0) {\n if (response.expiration > \"1970\") {\n t = response.key + \"=\" + response.value\n + \" \" + response.expiration\n } else {\n t = response.key + \"=\" + response.value\n }\n\n var e = $('<div id=\"store_' + response.key + '\"/>')\n .text(t)\n e.appendTo(content)\n }\n else {\n\n $(\"#store_\" + id)\n .text(t)\n }\n }\n // if delete\n else if (response.action == \"DELETE\") {\n id = response.key.replace(new RegExp(\"/\", 'g'), \"\\\\/\");\n\n $(\"#store_\" + id).remove()\n }\n }\n\n\n if (window[\"WebSocket\"]) {\n conn = new WebSocket(\"ws://{{.Address}}/ws\");\n conn.onclose = function(evt) {\n\n }\n conn.onmessage = function(evt) {\n var response = JSON.parse(evt.data)\n update(response)\n }\n } else {\n appendLog($(\"<div><b>Your browser does not support WebSockets.</b></div>\"))\n }\n });\n</script>\n</head>\n<body>\n <div id=\"leader\">Leader: {{.Leader}}</div>\n <div id=\"content\"></div>\n</body>\n</html>\n"

View File

@ -1,70 +0,0 @@
<html>
<head>
<title>etcd Web Interface</title>
<script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/1.10.1/jquery.min.js"></script>
<script type="text/javascript">
$(function() {
var conn;
var content = $("#content");
function update(response) {
// if set
if (response.action == "SET") {
if (response.expiration > "1970") {
t = response.key + "=" + response.value
+ " " + response.expiration
} else {
t = response.key + "=" + response.value
}
id = response.key.replace(new RegExp("/", 'g'), "\\/");
if ($("#store_" + id).length == 0) {
if (response.expiration > "1970") {
t = response.key + "=" + response.value
+ " " + response.expiration
} else {
t = response.key + "=" + response.value
}
var e = $('<div id="store_' + response.key + '"/>')
.text(t)
e.appendTo(content)
}
else {
$("#store_" + id)
.text(t)
}
}
// if delete
else if (response.action == "DELETE") {
id = response.key.replace(new RegExp("/", 'g'), "\\/");
$("#store_" + id).remove()
}
}
if (window["WebSocket"]) {
conn = new WebSocket("ws://{{.Address}}/ws");
conn.onclose = function(evt) {
}
conn.onmessage = function(evt) {
var response = JSON.parse(evt.data)
update(response)
}
} else {
appendLog($("<div><b>Your browser does not support WebSockets.</b></div>"))
}
});
</script>
</head>
<body>
<div id="leader">Leader: {{.Leader}}</div>
<div id="content"></div>
</body>
</html>

View File

@ -1,50 +0,0 @@
package web
import (
"code.google.com/p/go.net/websocket"
"fmt"
"github.com/coreos/go-raft"
"html/template"
"net/http"
"net/url"
)
var mainTempl *template.Template
var mainPage *MainPage
type MainPage struct {
Leader string
Address string
}
func mainHandler(c http.ResponseWriter, req *http.Request) {
p := mainPage
mainTempl.Execute(c, p)
}
func Start(raftServer raft.Server, webURL string) {
u, _ := url.Parse(webURL)
webMux := http.NewServeMux()
server := &http.Server{
Handler: webMux,
Addr: u.Host,
}
mainPage = &MainPage{
Leader: raftServer.Leader(),
Address: u.Host,
}
mainTempl = template.Must(template.New("index.html").Parse(index_html))
go h.run()
webMux.HandleFunc("/", mainHandler)
webMux.Handle("/ws", websocket.Handler(wsHandler))
fmt.Printf("etcd web server [%s] listening on %s\n", raftServer.Name(), u)
server.ListenAndServe()
}