diff --git a/client_handlers.go b/client_handlers.go index 7ee302e46..cd659b0d5 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -315,6 +315,20 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { } +// TestHandler +func TestHttpHandler(w http.ResponseWriter, req *http.Request) { + testType := req.URL.Path[len("/test/"):] + + if testType == "speed" { + directSet() + w.WriteHeader(http.StatusOK) + w.Write([]byte("speed test success")) + return + } + + w.WriteHeader(http.StatusBadRequest) +} + // Convert string duration to time format func durationToExpireTime(strDuration string) (time.Time, error) { if strDuration != "" { diff --git a/etcd.go b/etcd.go index 12901f91e..7a56b04cf 100644 --- a/etcd.go +++ b/etcd.go @@ -14,8 +14,10 @@ import ( "io/ioutil" "net" "net/http" + "net/url" "os" "os/signal" + "path" "runtime/pprof" "strings" "time" @@ -267,9 +269,6 @@ func startRaft(securityType int) { raftServer.Start() - // start to response to raft requests - go startRaftTransport(info.RaftPort, securityType) - if raftServer.IsLogEmpty() { // start as a leader in a new cluster @@ -339,6 +338,9 @@ func startRaft(securityType int) { go raftServer.Snapshot() } + // start to response to raft requests + go startRaftTransport(info.RaftPort, securityType) + } // Create transporter using by raft server @@ -436,6 +438,7 @@ func startClientTransport(port int, st int) { http.HandleFunc("/machines", MachinesHttpHandler) http.HandleFunc("/", VersionHttpHandler) http.HandleFunc("/stats", StatsHttpHandler) + http.HandleFunc("/test/", TestHttpHandler) switch st { @@ -624,15 +627,19 @@ func joinCluster(s *raft.Server, serverName string) error { return nil } if resp.StatusCode == http.StatusTemporaryRedirect { + address := resp.Header.Get("Location") debugf("Send Join Request to %s", address) - json.NewEncoder(&b).Encode(command) - segs := strings.Split(address, "://") - if len(segs) != 2 { - return fmt.Errorf("Unable to join: wrong redirection info") + u, err := url.Parse(address) + + if err != nil { + return fmt.Errorf("Unable to join: %s", err.Error()) } - path := segs[1] - resp, err = t.Post(path, &b) + + json.NewEncoder(&b).Encode(command) + + resp, err = t.Post(path.Join(u.Host, u.Path), &b) + } else if resp.StatusCode == http.StatusBadRequest { debug("Reach max number machines in the cluster") return fmt.Errorf(errors[103]) diff --git a/etcd_long_test.go b/etcd_long_test.go index 566a42bd5..db603b540 100644 --- a/etcd_long_test.go +++ b/etcd_long_test.go @@ -3,6 +3,7 @@ package main import ( "fmt" "math/rand" + "net/http" "os" "strconv" "strings" @@ -119,3 +120,22 @@ func TestKillRandom(t *testing.T) { <-leaderChan } + +func BenchmarkEtcdDirectCall(b *testing.B) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 3 + _, etcds, _ := createCluster(clusterSize, procAttr) + + defer destroyCluster(etcds) + + time.Sleep(time.Second) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + resp, _ := http.Get("http://0.0.0.0:4001/test/speed") + resp.Body.Close() + } + +} diff --git a/store/store.go b/store/store.go index 00ea1b926..5ef4bc2d4 100644 --- a/store/store.go +++ b/store/store.go @@ -21,6 +21,12 @@ type Store struct { // key-value store structure Tree *tree + // This mutex protects everything except add watcher member. + // Add watch member does not depend on the current state of the store. + // And watch will return when other protected function is called and reach + // the watching condition. + // It is needed so that clone() can atomically replicate the Store + // and do the log snapshot in a go routine. mutex sync.Mutex // WatcherHub is where we register all the clients diff --git a/store/tree_store_test.go b/store/tree_store_test.go index 99281f3ed..ad8222ffb 100644 --- a/store/tree_store_test.go +++ b/store/tree_store_test.go @@ -1,7 +1,7 @@ package store import ( - //"fmt" + "fmt" "math/rand" "strconv" "testing" @@ -64,20 +64,20 @@ func TestStoreGet(t *testing.T) { ts.set("/hello/fooo", NewTestNode("barbarbar")) ts.set("/hello/foooo/foo", NewTestNode("barbarbar")) - //nodes, keys, ok := ts.list("/hello") + nodes, keys, ok := ts.list("/hello") - // if !ok { - // t.Fatalf("cannot list!") - // } else { - // nodes, _ := nodes.([]*Node) - // length := len(nodes) + if !ok { + t.Fatalf("cannot list!") + } else { + nodes, _ := nodes.([]*Node) + length := len(nodes) - // for i := 0; i < length; i++ { - // fmt.Println(keys[i], "=", nodes[i].Value) - // } - // } + for i := 0; i < length; i++ { + fmt.Println(keys[i], "=", nodes[i].Value) + } + } - keys := GenKeys(100, 10) + keys = GenKeys(100, 10) for i := 0; i < 100; i++ { value := strconv.Itoa(rand.Int()) diff --git a/test.go b/test.go index b95abe5e1..02ce50c5a 100644 --- a/test.go +++ b/test.go @@ -166,6 +166,28 @@ func getLeader(addr string) (string, error) { } +func directSet() { + c := make(chan bool, 1000) + for i := 0; i < 1000; i++ { + go send(c) + } + + for i := 0; i < 1000; i++ { + <-c + } +} + +func send(c chan bool) { + for i := 0; i < 10; i++ { + command := &SetCommand{} + command.Key = "foo" + command.Value = "bar" + command.ExpireTime = time.Unix(0, 0) + raftServer.Do(command) + } + c <- true +} + // Dial with timeout func dialTimeoutFast(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, time.Millisecond*10)