refactor(tests/server_utils): use etcd instance
Remove duplicated etcd start code.release-0.4
parent
b0ffb4fd10
commit
7dce4c8fbb
|
@ -72,4 +72,7 @@ func (h *CORSHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
|
||||
h.Handler.ServeHTTP(w, req)
|
||||
|
||||
// Flush before leaving to send out all data.
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
|
|
@ -1,17 +1,9 @@
|
|||
package tests
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
|
||||
|
||||
"github.com/coreos/etcd/metrics"
|
||||
"github.com/coreos/etcd/config"
|
||||
"github.com/coreos/etcd/etcd"
|
||||
"github.com/coreos/etcd/server"
|
||||
"github.com/coreos/etcd/store"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -19,94 +11,30 @@ const (
|
|||
testClientURL = "localhost:4401"
|
||||
testRaftURL = "localhost:7701"
|
||||
testSnapshotCount = 10000
|
||||
testHeartbeatInterval = time.Duration(50) * time.Millisecond
|
||||
testElectionTimeout = time.Duration(200) * time.Millisecond
|
||||
testHeartbeatInterval = 50
|
||||
testElectionTimeout = 200
|
||||
testDataDir = "/tmp/ETCDTEST"
|
||||
)
|
||||
|
||||
// Starts a server in a temporary directory.
|
||||
// Starts a new server.
|
||||
func RunServer(f func(*server.Server)) {
|
||||
path, _ := ioutil.TempDir("", "etcd-")
|
||||
defer os.RemoveAll(path)
|
||||
c := config.New()
|
||||
|
||||
store := store.New()
|
||||
registry := server.NewRegistry(store)
|
||||
c.Name = testName
|
||||
c.Addr = testClientURL
|
||||
c.Peer.Addr = testRaftURL
|
||||
|
||||
serverStats := server.NewRaftServerStats(testName)
|
||||
followersStats := server.NewRaftFollowersStats(testName)
|
||||
c.DataDir = testDataDir
|
||||
c.Force = true
|
||||
|
||||
psConfig := server.PeerServerConfig{
|
||||
Name: testName,
|
||||
URL: "http://" + testRaftURL,
|
||||
Scheme: "http",
|
||||
SnapshotCount: testSnapshotCount,
|
||||
}
|
||||
|
||||
mb := metrics.NewBucket("")
|
||||
|
||||
ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats)
|
||||
psListener := server.NewListener("http", testRaftURL, nil)
|
||||
|
||||
// Create Raft transporter and server
|
||||
dialTimeout := (3 * testHeartbeatInterval) + testElectionTimeout
|
||||
responseHeaderTimeout := (3 * testHeartbeatInterval) + testElectionTimeout
|
||||
raftTransporter := server.NewTransporter(followersStats, serverStats, registry, testHeartbeatInterval, dialTimeout, responseHeaderTimeout)
|
||||
raftServer, err := raft.NewServer(testName, path, raftTransporter, store, ps, "")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
raftServer.SetElectionTimeout(testElectionTimeout)
|
||||
raftServer.SetHeartbeatInterval(testHeartbeatInterval)
|
||||
ps.SetRaftServer(raftServer)
|
||||
|
||||
s := server.New(testName, "http://"+testClientURL, ps, registry, store, nil)
|
||||
sListener := server.NewListener("http", testClientURL, nil)
|
||||
|
||||
ps.SetServer(s)
|
||||
|
||||
w := &sync.WaitGroup{}
|
||||
|
||||
// Start up peer server.
|
||||
c := make(chan bool)
|
||||
go func() {
|
||||
c <- true
|
||||
ps.Start(false, "", []string{})
|
||||
h := waitHandler{w, ps.HTTPHandler()}
|
||||
http.Serve(psListener, &h)
|
||||
}()
|
||||
<-c
|
||||
|
||||
// Start up etcd server.
|
||||
go func() {
|
||||
c <- true
|
||||
h := waitHandler{w, s.HTTPHandler()}
|
||||
http.Serve(sListener, &h)
|
||||
}()
|
||||
<-c
|
||||
|
||||
// Wait to make sure servers have started.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
c.Peer.HeartbeatInterval = testHeartbeatInterval
|
||||
c.Peer.ElectionTimeout = testElectionTimeout
|
||||
c.SnapshotCount = testSnapshotCount
|
||||
|
||||
i := etcd.New(c)
|
||||
go i.Run()
|
||||
<-i.ReadyNotify()
|
||||
// Execute the function passed in.
|
||||
f(s)
|
||||
|
||||
// Clean up servers.
|
||||
ps.Stop()
|
||||
psListener.Close()
|
||||
sListener.Close()
|
||||
w.Wait()
|
||||
}
|
||||
|
||||
type waitHandler struct {
|
||||
wg *sync.WaitGroup
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
func (h *waitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.wg.Add(1)
|
||||
defer h.wg.Done()
|
||||
h.handler.ServeHTTP(w, r)
|
||||
|
||||
//important to flush before decrementing the wait group.
|
||||
//we won't get a chance to once main() ends.
|
||||
w.(http.Flusher).Flush()
|
||||
f(i.Server)
|
||||
i.Stop()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue