pkg/mock/mockserver: support restart

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-04-01 17:52:26 -07:00
parent 657c2e15cc
commit f1aa428a38
1 changed files with 60 additions and 18 deletions

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"sync"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -26,45 +27,86 @@ import (
// MockServer provides a mocked out grpc server of the etcdserver interface. // MockServer provides a mocked out grpc server of the etcdserver interface.
type MockServer struct { type MockServer struct {
GrpcServer *grpc.Server ln net.Listener
Address string Address string
GrpcServer *grpc.Server
} }
// MockServers provides a cluster of mocket out gprc servers of the etcdserver interface. // MockServers provides a cluster of mocket out gprc servers of the etcdserver interface.
type MockServers []*MockServer type MockServers struct {
mu sync.RWMutex
Servers []*MockServer
wg sync.WaitGroup
}
// StartMockServers creates the desired count of mock servers // StartMockServers creates the desired count of mock servers
// and starts them. // and starts them.
func StartMockServers(count int) (svrs MockServers, err error) { func StartMockServers(count int) (ms *MockServers, err error) {
svrs = make(MockServers, count) ms = &MockServers{
Servers: make([]*MockServer, count),
wg: sync.WaitGroup{},
}
defer func() { defer func() {
if err != nil { if err != nil {
svrs.Stop() ms.Stop()
} }
}() }()
for idx := 0; idx < count; idx++ {
for i := 0; i < count; i++ { ln, err := net.Listen("tcp", "localhost:0")
listener, err := net.Listen("tcp", "localhost:0")
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to listen %v", err) return nil, fmt.Errorf("failed to listen %v", err)
} }
ms.Servers[idx] = &MockServer{ln: ln, Address: ln.Addr().String()}
ms.StartAt(idx)
}
return ms, nil
}
svr := grpc.NewServer() // StartAt restarts mock server at given index.
pb.RegisterKVServer(svr, &mockKVServer{}) func (ms *MockServers) StartAt(idx int) (err error) {
svrs[i] = &MockServer{GrpcServer: svr, Address: listener.Addr().String()} ms.mu.Lock()
go func(svr *grpc.Server, l net.Listener) { defer ms.mu.Unlock()
svr.Serve(l)
}(svr, listener) if ms.Servers[idx].ln == nil {
ms.Servers[idx].ln, err = net.Listen("tcp", ms.Servers[idx].Address)
if err != nil {
return fmt.Errorf("failed to listen %v", err)
}
} }
return svrs, nil svr := grpc.NewServer()
pb.RegisterKVServer(svr, &mockKVServer{})
ms.Servers[idx].GrpcServer = svr
go func(svr *grpc.Server, l net.Listener) {
ms.wg.Add(1)
svr.Serve(l)
}(ms.Servers[idx].GrpcServer, ms.Servers[idx].ln)
return nil
}
// StopAt stops mock server at given index.
func (ms *MockServers) StopAt(idx int) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.Servers[idx].ln == nil {
return
}
ms.Servers[idx].GrpcServer.Stop()
ms.Servers[idx].GrpcServer = nil
ms.Servers[idx].ln = nil
ms.wg.Done()
} }
// Stop stops the mock server, immediately closing all open connections and listeners. // Stop stops the mock server, immediately closing all open connections and listeners.
func (svrs MockServers) Stop() { func (ms *MockServers) Stop() {
for _, svr := range svrs { for idx := range ms.Servers {
svr.GrpcServer.Stop() ms.StopAt(idx)
} }
ms.wg.Wait()
} }
type mockKVServer struct{} type mockKVServer struct{}