etcdserver: initialize raftNode with constructor

raftNode was being initialized in start(), which was causing
hangs when trying to stop the etcd server since the stop channel
would not be initialized in time for the stop call. Instead,
setup non-configurable bits in a constructor.

Fixes #7668
release-3.2
Anthony Romano 2017-04-17 16:32:36 -07:00
parent 8fdf8f752b
commit 714b48a4b4
4 changed files with 166 additions and 155 deletions

View File

@ -94,14 +94,7 @@ type raftNode struct {
term uint64
lead uint64
mu sync.Mutex
// last lead elected time
lt time.Time
// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool
raft.Node
raftNodeConfig
// a chan to send/receive snapshot
msgSnapC chan raftpb.Message
@ -115,26 +108,49 @@ type raftNode struct {
// utility
ticker *time.Ticker
// contention detectors for raft heartbeat message
td *contention.TimeoutDetector
heartbeat time.Duration // for logging
raftStorage *raft.MemoryStorage
storage Storage
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport rafthttp.Transporter
td *contention.TimeoutDetector
stopped chan struct{}
done chan struct{}
}
type raftNodeConfig struct {
// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool
raft.Node
raftStorage *raft.MemoryStorage
storage Storage
heartbeat time.Duration // for logging
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport rafthttp.Transporter
}
func newRaftNode(cfg raftNodeConfig) *raftNode {
r := &raftNode{
raftNodeConfig: cfg,
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
readStateC: make(chan raft.ReadState, 1),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyc: make(chan apply),
stopped: make(chan struct{}),
done: make(chan struct{}),
}
if r.heartbeat == 0 {
r.ticker = &time.Ticker{}
} else {
r.ticker = time.NewTicker(r.heartbeat)
}
return r
}
// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
r.applyc = make(chan apply)
r.stopped = make(chan struct{})
r.done = make(chan struct{})
internalTimeout := time.Second
go func() {
@ -147,10 +163,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.Tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
r.mu.Lock()
r.lt = time.Now()
r.mu.Unlock()
newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
if newLeader {
leaderChanges.Inc()
}
@ -162,7 +176,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
rh.updateLeadership()
rh.updateLeadership(newLeader)
r.td.Reset()
}
if len(rd.ReadStates) != 0 {
@ -316,12 +331,6 @@ func (r *raftNode) apply() chan apply {
return r.applyc
}
func (r *raftNode) leadElectedTime() time.Time {
r.mu.Lock()
defer r.mu.Unlock()
return r.lt
}
func (r *raftNode) stop() {
r.stopped <- struct{}{}
<-r.done

View File

@ -153,13 +153,13 @@ func TestCreateConfigChangeEnts(t *testing.T) {
func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
n := newNopReadyNode()
srv := &EtcdServer{r: raftNode{
r := newRaftNode(raftNodeConfig{
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
}}
})
srv := &EtcdServer{r: *r}
srv.r.start(nil)
n.readyc <- raft.Ready{}
select {
@ -182,16 +182,16 @@ func TestConfgChangeBlocksApply(t *testing.T) {
waitApplyc := make(chan struct{})
srv := &EtcdServer{r: raftNode{
r := newRaftNode(raftNodeConfig{
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
}}
})
srv := &EtcdServer{r: *r}
rh := &raftReadyHandler{
updateLeadership: func() {},
updateLeadership: func(bool) {},
waitForApply: func() {
<-waitApplyc
},

View File

@ -41,7 +41,6 @@ import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/contention"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/pkg/pbutil"
@ -243,6 +242,9 @@ type EtcdServer struct {
// on etcd server shutdown.
ctx context.Context
cancel context.CancelFunc
leadTimeMu sync.RWMutex
leadElectedTime time.Time
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -419,19 +421,15 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
store: st,
r: raftNode{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
ticker: time.NewTicker(heartbeat),
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * heartbeat),
heartbeat: heartbeat,
raftStorage: s,
storage: NewStorage(w, ss),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
readStateC: make(chan raft.ReadState, 1),
},
r: *newRaftNode(
raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
heartbeat: heartbeat,
raftStorage: s,
storage: NewStorage(w, ss),
},
),
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
@ -614,7 +612,7 @@ type etcdProgress struct {
// and helps decouple state machine logic from Raft algorithms.
// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
type raftReadyHandler struct {
updateLeadership func()
updateLeadership func(newLeader bool)
updateCommittedIndex func(uint64)
waitForApply func()
}
@ -644,7 +642,7 @@ func (s *EtcdServer) run() {
return
}
rh := &raftReadyHandler{
updateLeadership: func() {
updateLeadership: func(newLeader bool) {
if !s.isLeader() {
if s.lessor != nil {
s.lessor.Demote()
@ -654,6 +652,12 @@ func (s *EtcdServer) run() {
}
setSyncC(nil)
} else {
if newLeader {
t := time.Now()
s.leadTimeMu.Lock()
s.leadElectedTime = t
s.leadTimeMu.Unlock()
}
setSyncC(s.SyncTicker.C)
if s.compactor != nil {
s.compactor.Resume()
@ -665,9 +669,6 @@ func (s *EtcdServer) run() {
if s.stats != nil {
s.stats.BecomeLeader()
}
if s.r.td != nil {
s.r.td.Reset()
}
},
updateCommittedIndex: func(ci uint64) {
cci := s.getCommittedIndex()
@ -1580,7 +1581,9 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
case context.Canceled:
return ErrCanceled
case context.DeadlineExceeded:
curLeadElected := s.r.leadElectedTime()
s.leadTimeMu.RLock()
curLeadElected := s.leadElectedTime
s.leadTimeMu.RUnlock()
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
if start.After(prevLeadLost) && start.Before(curLeadElected) {
return ErrTimeoutDueToLeaderFail

View File

@ -167,14 +167,14 @@ func TestApplyRepeat(t *testing.T) {
st := store.New()
cl.SetStore(store.New())
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
s := &EtcdServer{
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
},
r: *r,
Cfg: &ServerConfig{},
store: st,
cluster: cl,
@ -525,7 +525,7 @@ func TestApplyConfChangeError(t *testing.T) {
for i, tt := range tests {
n := newNodeRecorder()
srv := &EtcdServer{
r: raftNode{Node: n},
r: *newRaftNode(raftNodeConfig{Node: n}),
cluster: cl,
Cfg: &ServerConfig{},
}
@ -552,12 +552,13 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
for i := 1; i <= 3; i++ {
cl.AddMember(&membership.Member{ID: types.ID(i)})
}
r := newRaftNode(raftNodeConfig{
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
id: 1,
r: raftNode{
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
},
id: 1,
r: *r,
cluster: cl,
}
cc := raftpb.ConfChange{
@ -592,12 +593,13 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
for i := 1; i <= 5; i++ {
cl.AddMember(&membership.Member{ID: types.ID(i)})
}
r := newRaftNode(raftNodeConfig{
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
id: 2,
r: raftNode{
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
},
id: 2,
r: *r,
cluster: cl,
w: wait.New(),
}
@ -630,15 +632,15 @@ func TestDoProposal(t *testing.T) {
}
for i, tt := range tests {
st := mockstore.NewRecorder()
r := newRaftNode(raftNodeConfig{
Node: newNodeCommitter(),
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: newNodeCommitter(),
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
},
Cfg: &ServerConfig{TickMs: 1},
r: *r,
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
@ -666,7 +668,7 @@ func TestDoProposalCancelled(t *testing.T) {
wt := mockwait.NewRecorder()
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: newNodeNop()},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: wt,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -688,7 +690,7 @@ func TestDoProposalCancelled(t *testing.T) {
func TestDoProposalTimeout(t *testing.T) {
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: newNodeNop()},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -704,7 +706,7 @@ func TestDoProposalTimeout(t *testing.T) {
func TestDoProposalStopped(t *testing.T) {
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: newNodeNop()},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -723,7 +725,7 @@ func TestSync(t *testing.T) {
n := newNodeRecorder()
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
r: raftNode{Node: n},
r: *newRaftNode(raftNodeConfig{Node: n}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
ctx: ctx,
cancel: cancel,
@ -766,7 +768,7 @@ func TestSyncTimeout(t *testing.T) {
n := newProposalBlockerRecorder()
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
r: raftNode{Node: n},
r: *newRaftNode(raftNodeConfig{Node: n}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
ctx: ctx,
cancel: cancel,
@ -799,15 +801,16 @@ func TestSyncTrigger(t *testing.T) {
n := newReadyNode()
st := make(chan time.Time, 1)
tk := &time.Ticker{C: st}
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
storage: mockstorage.NewStorageRecorder(""),
})
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
storage: mockstorage.NewStorageRecorder(""),
ticker: &time.Ticker{},
},
Cfg: &ServerConfig{TickMs: 1},
r: *r,
store: mockstore.NewNop(),
SyncTicker: tk,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -858,13 +861,14 @@ func TestSnapshot(t *testing.T) {
s.Append([]raftpb.Entry{{Index: 1}})
st := mockstore.NewRecorderStream()
p := mockstorage.NewStorageRecorderStream("")
r := newRaftNode(raftNodeConfig{
Node: newNodeNop(),
raftStorage: s,
storage: p,
})
srv := &EtcdServer{
Cfg: &ServerConfig{},
r: raftNode{
Node: newNodeNop(),
raftStorage: s,
storage: p,
},
Cfg: &ServerConfig{},
r: *r,
store: st,
}
srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
@ -914,16 +918,16 @@ func TestTriggerSnap(t *testing.T) {
snapc := 10
st := mockstore.NewRecorder()
p := mockstorage.NewStorageRecorderStream("")
r := newRaftNode(raftNodeConfig{
Node: newNodeCommitter(),
raftStorage: raft.NewMemoryStorage(),
storage: p,
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
snapCount: uint64(snapc),
r: raftNode{
Node: newNodeCommitter(),
raftStorage: raft.NewMemoryStorage(),
storage: p,
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
},
Cfg: &ServerConfig{TickMs: 1},
snapCount: uint64(snapc),
r: *r,
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
@ -962,10 +966,6 @@ func TestTriggerSnap(t *testing.T) {
// TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
// proposals.
func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
const (
// snapshots that may queue up at once without dropping
maxInFlightMsgSnap = 16
)
n := newNopReadyNode()
st := store.New()
cl := membership.NewCluster("abc")
@ -982,19 +982,18 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
rs := raft.NewMemoryStorage()
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
r := newRaftNode(raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
storage: mockstorage.NewStorageRecorder(testdir),
raftStorage: rs,
})
s := &EtcdServer{
Cfg: &ServerConfig{
DataDir: testdir,
},
r: raftNode{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
storage: mockstorage.NewStorageRecorder(testdir),
raftStorage: rs,
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
ticker: &time.Ticker{},
},
r: *r,
store: st,
cluster: cl,
SyncTicker: &time.Ticker{},
@ -1069,14 +1068,14 @@ func TestAddMember(t *testing.T) {
cl := newTestCluster(nil)
st := store.New()
cl.SetStore(st)
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
s := &EtcdServer{
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
},
r: *r,
Cfg: &ServerConfig{},
store: st,
cluster: cl,
@ -1111,14 +1110,14 @@ func TestRemoveMember(t *testing.T) {
st := store.New()
cl.SetStore(store.New())
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
s := &EtcdServer{
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
},
r: *r,
Cfg: &ServerConfig{},
store: st,
cluster: cl,
@ -1152,14 +1151,14 @@ func TestUpdateMember(t *testing.T) {
st := store.New()
cl.SetStore(st)
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
s := &EtcdServer{
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
},
r: *r,
store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1196,7 +1195,7 @@ func TestPublish(t *testing.T) {
readych: make(chan struct{}),
Cfg: &ServerConfig{TickMs: 1},
id: 1,
r: raftNode{Node: n, ticker: &time.Ticker{}},
r: *newRaftNode(raftNodeConfig{Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
w: w,
@ -1239,13 +1238,13 @@ func TestPublish(t *testing.T) {
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
r := newRaftNode(raftNodeConfig{
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
},
Cfg: &ServerConfig{TickMs: 1},
r: *r,
cluster: &membership.RaftCluster{},
w: mockwait.NewNop(),
done: make(chan struct{}),
@ -1267,7 +1266,7 @@ func TestPublishRetry(t *testing.T) {
n := newNodeRecorderStream()
srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: n, ticker: &time.Ticker{}},
r: *newRaftNode(raftNodeConfig{Node: n}),
w: mockwait.NewNop(),
stopping: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1308,7 +1307,7 @@ func TestUpdateVersion(t *testing.T) {
srv := &EtcdServer{
id: 1,
Cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: n, ticker: &time.Ticker{}},
r: *newRaftNode(raftNodeConfig{Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &membership.RaftCluster{},
w: w,