From a40a270e19fcd78925a5d18fdc2d19c940ad6a3a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 29 Sep 2014 11:52:36 -0700 Subject: [PATCH] etcdserver: publish self info when start --- etcdserver/etcdhttp/http_test.go | 6 +++--- etcdserver/server.go | 15 +++++++++++++-- etcdserver/server_test.go | 18 +++++++++--------- main.go | 4 +++- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 46ebfe7b4..962e91a1f 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -712,8 +712,8 @@ func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error { return fs.err } -func (fs *errServer) Start() {} -func (fs *errServer) Stop() {} +func (fs *errServer) Start(m etcdserver.Member) {} +func (fs *errServer) Stop() {} // errReader implements io.Reader to facilitate a broken request. type errReader struct{} @@ -838,7 +838,7 @@ func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.R return rs.res, nil } func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) Start() {} +func (rs *resServer) Start(m etcdserver.Member) {} func (rs *resServer) Stop() {} func mustMarshalEvent(t *testing.T, ev *store.Event) string { diff --git a/etcdserver/server.go b/etcdserver/server.go index f9995004a..1052a20da 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,6 +19,8 @@ import ( const ( defaultSyncTimeout = time.Second DefaultSnapCount = 10000 + // TODO: calculated based on heartbeat interval + defaultPublishRetryInterval = 5 * time.Second ) var ( @@ -57,7 +59,7 @@ type Server interface { // begin serving requests. It must be called before Do or Process. // Start must be non-blocking; any long-running server functionality // should be implemented in goroutines. - Start() + Start(m Member) // Stop terminates the Server and performs any necessary finalization. // Do and Process cannot be called after Stop has been invoked. Stop() @@ -102,7 +104,16 @@ type EtcdServer struct { // Start prepares and starts server in a new goroutine. It is no longer safe to // modify a server's fields after it has been sent to Start. -func (s *EtcdServer) Start() { +// It also starts a goroutine to publish its server information. +func (s *EtcdServer) Start(m Member) { + s.start() + go s.publish(m, defaultPublishRetryInterval) +} + +// start prepares and starts server in a new goroutine. It is no longer safe to +// modify a server's fields after it has been sent to Start. +// This function is just used for testing. +func (s *EtcdServer) start() { if s.SnapCount == 0 { log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount) s.SnapCount = DefaultSnapCount diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 624f65175..22a473c9b 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -400,7 +400,7 @@ func testServer(t *testing.T, ns int64) { Storage: &storageRecorder{}, Ticker: tk.C, } - srv.Start() + srv.start() // TODO(xiangli): randomize election timeout // then remove this sleep. time.Sleep(1 * time.Millisecond) @@ -469,7 +469,7 @@ func TestDoProposal(t *testing.T) { Storage: &storageRecorder{}, Ticker: tk, } - srv.Start() + srv.start() resp, err := srv.Do(ctx, tt) srv.Stop() @@ -539,7 +539,7 @@ func TestDoProposalStopped(t *testing.T) { Storage: &storageRecorder{}, Ticker: tk, } - srv.Start() + srv.start() done := make(chan struct{}) var err error @@ -639,7 +639,7 @@ func TestSyncTrigger(t *testing.T) { Storage: &storageRecorder{}, SyncTicker: st, } - srv.Start() + srv.start() // trigger the server to become a leader and accept sync requests n.readyc <- raft.Ready{ SoftState: &raft.SoftState{ @@ -710,7 +710,7 @@ func TestTriggerSnap(t *testing.T) { SnapCount: 10, } - s.Start() + s.start() for i := 0; int64(i) < s.SnapCount; i++ { s.Do(ctx, pb.Request{Method: "PUT", ID: 1}) } @@ -741,7 +741,7 @@ func TestRecvSnapshot(t *testing.T) { Node: n, } - s.Start() + s.start() n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}} // make goroutines move forward to receive snapshot pkg.ForceGosched() @@ -769,7 +769,7 @@ func TestRecvSlowSnapshot(t *testing.T) { Node: n, } - s.Start() + s.start() n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}} // make goroutines move forward to receive snapshot pkg.ForceGosched() @@ -794,7 +794,7 @@ func TestAddNode(t *testing.T) { Send: func(_ []raftpb.Message) {}, Storage: &storageRecorder{}, } - s.Start() + s.start() s.AddNode(context.TODO(), 1, []byte("foo")) gaction := n.Action() s.Stop() @@ -814,7 +814,7 @@ func TestRemoveNode(t *testing.T) { Send: func(_ []raftpb.Message) {}, Storage: &storageRecorder{}, } - s.Start() + s.start() s.RemoveNode(context.TODO(), 1) gaction := n.Action() s.Stop() diff --git a/main.go b/main.go index 8e3fac9d9..5cd98e5df 100644 --- a/main.go +++ b/main.go @@ -205,7 +205,9 @@ func startEtcd() { SnapCount: *snapCount, ClusterStore: cls, } - s.Start() + member := *self + member.ClientURLs = *addrs + s.Start(member) ch := &pkg.CORSHandler{ Handler: etcdhttp.NewClientHandler(s, cls, *timeout),