From cbb706cd479889a04977c26846ad3157eff98637 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 13 May 2014 11:08:03 -0400 Subject: [PATCH] bump(goraft/raft): c76c5d95 --- third_party/github.com/goraft/raft/command.go | 4 +++- third_party/github.com/goraft/raft/log_entry.go | 4 +++- third_party/github.com/goraft/raft/peer.go | 4 ++++ third_party/github.com/goraft/raft/server.go | 14 +++++++++++++- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/third_party/github.com/goraft/raft/command.go b/third_party/github.com/goraft/raft/command.go index 5a92d6d40..934366261 100644 --- a/third_party/github.com/goraft/raft/command.go +++ b/third_party/github.com/goraft/raft/command.go @@ -56,7 +56,9 @@ func newCommand(name string, data []byte) (Command, error) { return nil, err } } else { - json.NewDecoder(bytes.NewReader(data)).Decode(copy) + if err := json.NewDecoder(bytes.NewReader(data)).Decode(copy); err != nil { + return nil, err + } } } diff --git a/third_party/github.com/goraft/raft/log_entry.go b/third_party/github.com/goraft/raft/log_entry.go index f186b5724..0692ca8fe 100644 --- a/third_party/github.com/goraft/raft/log_entry.go +++ b/third_party/github.com/goraft/raft/log_entry.go @@ -29,7 +29,9 @@ func newLogEntry(log *Log, event *ev, index uint64, term uint64, command Command return nil, err } } else { - json.NewEncoder(&buf).Encode(command) + if err := json.NewEncoder(&buf).Encode(command); err != nil { + return nil, err + } } } diff --git a/third_party/github.com/goraft/raft/peer.go b/third_party/github.com/goraft/raft/peer.go index df9e4b0c6..9b5c9dfa6 100644 --- a/third_party/github.com/goraft/raft/peer.go +++ b/third_party/github.com/goraft/raft/peer.go @@ -89,6 +89,8 @@ func (p *Peer) startHeartbeat() { p.stopChan = make(chan bool) c := make(chan bool) + p.setLastActivity(time.Now()) + p.server.routineGroup.Add(1) go func() { defer p.server.routineGroup.Done() @@ -99,6 +101,8 @@ func (p *Peer) startHeartbeat() { // Stops the peer heartbeat. func (p *Peer) stopHeartbeat(flush bool) { + p.setLastActivity(time.Time{}) + p.stopChan <- flush } diff --git a/third_party/github.com/goraft/raft/server.go b/third_party/github.com/goraft/raft/server.go index 71fbadeb2..8a9d05c15 100644 --- a/third_party/github.com/goraft/raft/server.go +++ b/third_party/github.com/goraft/raft/server.go @@ -334,6 +334,8 @@ func (s *server) IsLogEmpty() bool { // A list of all the log entries. This should only be used for debugging purposes. func (s *server) LogEntries() []*LogEntry { + s.log.mutex.RLock() + defer s.log.mutex.RUnlock() return s.log.entries } @@ -471,7 +473,9 @@ func (s *server) Start() error { return nil } -// Init initializes the raft server +// Init initializes the raft server. +// If there is no previous log file under the given path, Init() will create an empty log file. +// Otherwise, Init() will load in the log entries from the log file. func (s *server) Init() error { if s.Running() { return fmt.Errorf("raft.Server: Server already running[%v]", s.state) @@ -613,6 +617,10 @@ func (s *server) loop() { // Sends an event to the event loop to be processed. The function will wait // until the event is actually processed before returning. func (s *server) send(value interface{}) (interface{}, error) { + if !s.Running() { + return nil, StopError + } + event := &ev{target: value, c: make(chan error, 1)} select { case s.c <- event: @@ -628,6 +636,10 @@ func (s *server) send(value interface{}) (interface{}, error) { } func (s *server) sendAsync(value interface{}) { + if !s.Running() { + return + } + event := &ev{target: value, c: make(chan error, 1)} // try a non-blocking send first // in most cases, this should not be blocking