From 8d245b546f93761be35b465e189f53df78be723e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 12 Sep 2013 13:17:19 -0400 Subject: [PATCH 1/7] cleanup --- command.go | 7 +++++-- etcd_handlers.go | 43 ++++++++++++++++++++++++------------------- raft_server.go | 25 ++++++++++++------------- raft_stats.go | 35 ++++++++++++++++++++++------------- transporter.go | 4 ++-- util.go | 17 +++++++++++++++++ 6 files changed, 82 insertions(+), 49 deletions(-) diff --git a/command.go b/command.go index 0147811da..dc0684707 100644 --- a/command.go +++ b/command.go @@ -170,8 +170,9 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) + // add peer stats if c.Name != r.Name() { - r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63} + r.peersStats.Peers[c.Name] = &raftPeerStats{MinLatency: 1 << 63} } return b, err @@ -198,7 +199,9 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { key := path.Join("_etcd/machines", c.Name) _, err := etcdStore.Delete(key, raftServer.CommitIndex()) - delete(r.peersStats, c.Name) + + // delete from stats + delete(r.peersStats.Peers, c.Name) if err != nil { return []byte{0}, err diff --git a/etcd_handlers.go b/etcd_handlers.go index b7cf0e791..24d7bb8aa 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -22,7 +22,7 @@ func NewEtcdMuxer() *http.ServeMux { etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler)) etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler)) etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler)) - etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler)) + etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler)) etcdMux.Handle("/version", errorHandler(VersionHttpHandler)) etcdMux.HandleFunc("/test/", TestHttpHandler) return etcdMux @@ -167,22 +167,8 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) er return etcdErr.NewError(300, "") } - // tell the client where is the leader - path := req.URL.Path + redirect(leader, etcd, w, req) - var url string - - if etcd { - etcdAddr, _ := nameToEtcdURL(leader) - url = etcdAddr + path - } else { - raftAddr, _ := nameToRaftURL(leader) - url = raftAddr + path - } - - debugf("Redirect to %s", url) - - http.Redirect(w, req, url, http.StatusTemporaryRedirect) return nil } return etcdErr.NewError(300, "") @@ -227,9 +213,28 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { // Handler to return the basic stats of etcd func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { - w.WriteHeader(http.StatusOK) - w.Write(etcdStore.Stats()) - w.Write(r.Stats()) + option := req.URL.Path[len("/v1/stats/"):] + + switch option { + case "self": + w.WriteHeader(http.StatusOK) + w.Write(r.Stats()) + case "leader": + if r.State() == raft.Leader { + w.Write(r.PeerStats()) + } else { + leader := r.Leader() + // current no leader + if leader == "" { + return etcdErr.NewError(300, "") + } + redirect(leader, true, w, req) + } + case "store": + w.WriteHeader(http.StatusOK) + w.Write(etcdStore.Stats()) + } + return nil } diff --git a/raft_server.go b/raft_server.go index 9342e2997..ec3388aa9 100644 --- a/raft_server.go +++ b/raft_server.go @@ -24,7 +24,7 @@ type raftServer struct { listenHost string tlsConf *TLSConfig tlsInfo *TLSInfo - peersStats map[string]*raftPeerStats + peersStats *raftPeersStats serverStats *raftServerStats } @@ -48,7 +48,10 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi listenHost: listenHost, tlsConf: tlsConf, tlsInfo: tlsInfo, - peersStats: make(map[string]*raftPeerStats), + peersStats: &raftPeersStats{ + Leader: name, + Peers: make(map[string]*raftPeerStats), + }, serverStats: &raftServerStats{ StartTime: time.Now(), sendRateQueue: &statsQueue{ @@ -63,7 +66,6 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi // Start the raft server func (r *raftServer) ListenAndServe() { - // Setup commands. registerCommands() @@ -282,7 +284,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { } func (r *raftServer) Stats() []byte { - r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String() + r.serverStats.LeaderInfo.Uptime = time.Now().Sub(r.serverStats.LeaderInfo.startTime).String() queue := r.serverStats.sendRateQueue @@ -292,20 +294,17 @@ func (r *raftServer) Stats() []byte { r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate() - sBytes, err := json.Marshal(r.serverStats) + b, _ := json.Marshal(r.serverStats) - if err != nil { - warn(err) - } + return b +} +func (r *raftServer) PeerStats() []byte { if r.State() == raft.Leader { - pBytes, _ := json.Marshal(r.peersStats) - - b := append(sBytes, pBytes...) + b, _ := json.Marshal(r.peersStats) return b } - - return sBytes + return nil } // Register commands to raft server diff --git a/raft_stats.go b/raft_stats.go index 175a1be55..439c14ce9 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -33,10 +33,14 @@ func (ps *packageStats) Time() time.Time { } type raftServerStats struct { - State string `json:"state"` - StartTime time.Time `json:"startTime"` - Leader string `json:"leader"` - LeaderUptime string `json:"leaderUptime"` + State string `json:"state"` + StartTime time.Time `json:"startTime"` + + LeaderInfo struct { + Name string `json:"leader"` + Uptime string `json:"uptime"` + startTime time.Time + } `json:"leaderInfo"` RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` @@ -46,16 +50,15 @@ type raftServerStats struct { SendingPkgRate float64 `json:"sendPkgRate,omitempty"` SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` - leaderStartTime time.Time - sendRateQueue *statsQueue - recvRateQueue *statsQueue + sendRateQueue *statsQueue + recvRateQueue *statsQueue } func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { ss.State = raft.Follower - if leaderName != ss.Leader { - ss.Leader = leaderName - ss.leaderStartTime = time.Now() + if leaderName != ss.LeaderInfo.Name { + ss.LeaderInfo.Name = leaderName + ss.LeaderInfo.startTime = time.Now() } ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) @@ -64,17 +67,23 @@ func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { func (ss *raftServerStats) SendAppendReq(pkgSize int) { now := time.Now() + if ss.State != raft.Leader { ss.State = raft.Leader - ss.Leader = r.Name() - ss.leaderStartTime = now + ss.LeaderInfo.Name = r.Name() + ss.LeaderInfo.startTime = now } - ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) + ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize)) ss.SendAppendRequestCnt++ } +type raftPeersStats struct { + Leader string `json:"leader"` + Peers map[string]*raftPeerStats `json:"peers"` +} + type raftPeerStats struct { Latency float64 `json:"latency"` AvgLatency float64 `json:"averageLatency"` diff --git a/transporter.go b/transporter.go index 461741ce6..f0145b63c 100644 --- a/transporter.go +++ b/transporter.go @@ -66,7 +66,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P debugf("Send LogEntries to %s ", u) - thisPeerStats, ok := r.peersStats[peer.Name] + thisPeerStats, ok := r.peersStats.Peers[peer.Name] start := time.Now() @@ -85,7 +85,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P } } - r.peersStats[peer.Name] = thisPeerStats + r.peersStats.Peers[peer.Name] = thisPeerStats if resp != nil { defer resp.Body.Close() diff --git a/util.go b/util.go index 579f1c675..7fba80b3e 100644 --- a/util.go +++ b/util.go @@ -128,6 +128,23 @@ func sanitizeListenHost(listen string, advertised string) string { return net.JoinHostPort(listen, aport) } +func redirect(node string, etcd bool, w http.ResponseWriter, req *http.Request) { + var url string + path := req.URL.Path + + if etcd { + etcdAddr, _ := nameToEtcdURL(node) + url = etcdAddr + path + } else { + raftAddr, _ := nameToRaftURL(node) + url = raftAddr + path + } + + debugf("Redirect to %s", url) + + http.Redirect(w, req, url, http.StatusTemporaryRedirect) +} + func check(err error) { if err != nil { fatal(err) From d3fbf6d997b0cefc1c847f39d781057c2a9e1701 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 21 Sep 2013 23:09:20 -0400 Subject: [PATCH 2/7] bump 3rd party --- raft_server.go | 2 +- .../p/goprotobuf/proto/all_test.go | 70 ++++++++++++++++- .../p/goprotobuf/proto/decode.go | 16 +++- .../p/goprotobuf/proto/encode.go | 78 +++++++++++++------ .../p/goprotobuf/proto/extensions.go | 8 +- .../p/goprotobuf/proto/testdata/test.pb.go | 6 +- .../p/goprotobuf/proto/text_parser.go | 4 +- .../p/goprotobuf/proto/text_test.go | 2 +- .../protoc-gen-go/descriptor/descriptor.pb.go | 4 +- .../protoc-gen-go/generator/generator.go | 21 +++++ .../protoc-gen-go/testdata/my_test/test.pb.go | 2 +- .../testdata/my_test/test.pb.go.golden | 2 +- .../protoc-gen-go/testdata/my_test/test.proto | 2 +- .../github.com/ccding/go-logging/example.conf | 3 +- .../github.com/ccding/go-logging/example.go | 4 +- .../ccding/go-logging/logging/commands.go | 2 +- .../ccding/go-logging/logging/logging.go | 2 +- .../github.com/coreos/go-etcd/README.md | 47 ++++++++++- .../github.com/coreos/go-etcd/etcd/client.go | 5 +- .../github.com/coreos/go-etcd/etcd/version.go | 2 +- .../coreos/go-raft/append_entries_request.go | 4 +- .../go-raft/append_entries_request_test.go | 6 +- .../coreos/go-raft/append_entries_response.go | 4 +- .../go-raft/append_entries_response_test.go | 6 +- .../coreos/go-raft/http_transporter.go | 16 ++-- third_party/github.com/coreos/go-raft/log.go | 30 +++---- third_party/github.com/coreos/go-raft/peer.go | 6 ++ .../coreos/go-raft/request_vote_request.go | 4 +- .../coreos/go-raft/request_vote_response.go | 4 +- .../github.com/coreos/go-raft/server.go | 17 +++- .../github.com/coreos/go-raft/server_test.go | 2 +- third_party/github.com/coreos/go-raft/test.go | 4 +- 32 files changed, 288 insertions(+), 97 deletions(-) diff --git a/raft_server.go b/raft_server.go index 9342e2997..ca853c789 100644 --- a/raft_server.go +++ b/raft_server.go @@ -36,7 +36,7 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout) // Create raft server - server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) + server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil, "") check(err) diff --git a/third_party/code.google.com/p/goprotobuf/proto/all_test.go b/third_party/code.google.com/p/goprotobuf/proto/all_test.go index 9d5115854..eec584104 100644 --- a/third_party/code.google.com/p/goprotobuf/proto/all_test.go +++ b/third_party/code.google.com/p/goprotobuf/proto/all_test.go @@ -431,7 +431,7 @@ func TestRequiredBit(t *testing.T) { err := o.Marshal(pb) if err == nil { t.Error("did not catch missing required fields") - } else if strings.Index(err.Error(), "GoTest") < 0 { + } else if strings.Index(err.Error(), "Kind") < 0 { t.Error("wrong error type:", err) } } @@ -1205,7 +1205,7 @@ func TestRequiredFieldEnforcement(t *testing.T) { _, err := Marshal(pb) if err == nil { t.Error("marshal: expected error, got nil") - } else if strings.Index(err.Error(), "GoTestField") < 0 { + } else if strings.Index(err.Error(), "Label") < 0 { t.Errorf("marshal: bad error type: %v", err) } @@ -1216,7 +1216,7 @@ func TestRequiredFieldEnforcement(t *testing.T) { err = Unmarshal(buf, pb) if err == nil { t.Error("unmarshal: expected error, got nil") - } else if strings.Index(err.Error(), "GoTestField") < 0 { + } else if strings.Index(err.Error(), "{Unknown}") < 0 { t.Errorf("unmarshal: bad error type: %v", err) } } @@ -1670,6 +1670,70 @@ func TestEncodingSizes(t *testing.T) { } } +func TestErrRequiredNotSet(t *testing.T) { + pb := initGoTest(false) + pb.RequiredField.Label = nil + pb.F_Int32Required = nil + pb.F_Int64Required = nil + + expected := "0807" + // field 1, encoding 0, value 7 + "2206" + "120474797065" + // field 4, encoding 2 (GoTestField) + "5001" + // field 10, encoding 0, value 1 + "6d20000000" + // field 13, encoding 5, value 0x20 + "714000000000000000" + // field 14, encoding 1, value 0x40 + "78a019" + // field 15, encoding 0, value 0xca0 = 3232 + "8001c032" + // field 16, encoding 0, value 0x1940 = 6464 + "8d0100004a45" + // field 17, encoding 5, value 3232.0 + "9101000000000040b940" + // field 18, encoding 1, value 6464.0 + "9a0106" + "737472696e67" + // field 19, encoding 2, string "string" + "b304" + // field 70, encoding 3, start group + "ba0408" + "7265717569726564" + // field 71, encoding 2, string "required" + "b404" + // field 70, encoding 4, end group + "aa0605" + "6279746573" + // field 101, encoding 2, string "bytes" + "b0063f" + // field 102, encoding 0, 0x3f zigzag32 + "b8067f" // field 103, encoding 0, 0x7f zigzag64 + + o := old() + bytes, err := Marshal(pb) + if _, ok := err.(*ErrRequiredNotSet); !ok { + fmt.Printf("marshal-1 err = %v, want *ErrRequiredNotSet", err) + o.DebugPrint("", bytes) + t.Fatalf("expected = %s", expected) + } + if strings.Index(err.Error(), "RequiredField.Label") < 0 { + t.Errorf("marshal-1 wrong err msg: %v", err) + } + if !equal(bytes, expected, t) { + o.DebugPrint("neq 1", bytes) + t.Fatalf("expected = %s", expected) + } + + // Now test Unmarshal by recreating the original buffer. + pbd := new(GoTest) + err = Unmarshal(bytes, pbd) + if _, ok := err.(*ErrRequiredNotSet); !ok { + t.Fatalf("unmarshal err = %v, want *ErrRequiredNotSet", err) + o.DebugPrint("", bytes) + t.Fatalf("string = %s", expected) + } + if strings.Index(err.Error(), "RequiredField.{Unknown}") < 0 { + t.Errorf("unmarshal wrong err msg: %v", err) + } + bytes, err = Marshal(pbd) + if _, ok := err.(*ErrRequiredNotSet); !ok { + t.Errorf("marshal-2 err = %v, want *ErrRequiredNotSet", err) + o.DebugPrint("", bytes) + t.Fatalf("string = %s", expected) + } + if strings.Index(err.Error(), "RequiredField.Label") < 0 { + t.Errorf("marshal-2 wrong err msg: %v", err) + } + if !equal(bytes, expected, t) { + o.DebugPrint("neq 2", bytes) + t.Fatalf("string = %s", expected) + } +} + func fuzzUnmarshal(t *testing.T, data []byte) { defer func() { if e := recover(); e != nil { diff --git a/third_party/code.google.com/p/goprotobuf/proto/decode.go b/third_party/code.google.com/p/goprotobuf/proto/decode.go index f951c01a7..3c58cfb9b 100644 --- a/third_party/code.google.com/p/goprotobuf/proto/decode.go +++ b/third_party/code.google.com/p/goprotobuf/proto/decode.go @@ -46,7 +46,7 @@ import ( // ErrWrongType occurs when the wire encoding for the field disagrees with // that specified in the type being decoded. This is usually caused by attempting // to convert an encoded protocol buffer into a struct of the wrong type. -var ErrWrongType = errors.New("field/encoding mismatch: wrong type for field") +var ErrWrongType = errors.New("proto: field/encoding mismatch: wrong type for field") // errOverflow is returned when an integer is too large to be represented. var errOverflow = errors.New("proto: integer overflow") @@ -353,6 +353,7 @@ func (p *Buffer) Unmarshal(pb Message) error { // unmarshalType does the work of unmarshaling a structure. func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group bool, base structPointer) error { + var state errorState required, reqFields := prop.reqCount, uint64(0) var err error @@ -406,7 +407,10 @@ func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group continue } } - err = dec(o, p, base) + decErr := dec(o, p, base) + if decErr != nil && !state.shouldContinue(decErr, p) { + err = decErr + } if err == nil && p.Required { // Successfully decoded a required field. if tag <= 64 { @@ -430,8 +434,14 @@ func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group if is_group { return io.ErrUnexpectedEOF } + if state.err != nil { + return state.err + } if required > 0 { - return &ErrRequiredNotSet{st} + // Not enough information to determine the exact field. If we use extra + // CPU, we could determine the field only if the missing required field + // has a tag <= 64 and we check reqFields. + return &ErrRequiredNotSet{"{Unknown}"} } } return err diff --git a/third_party/code.google.com/p/goprotobuf/proto/encode.go b/third_party/code.google.com/p/goprotobuf/proto/encode.go index 9d592cd5e..d49ab84d2 100644 --- a/third_party/code.google.com/p/goprotobuf/proto/encode.go +++ b/third_party/code.google.com/p/goprotobuf/proto/encode.go @@ -37,6 +37,7 @@ package proto import ( "errors" + "fmt" "reflect" "sort" ) @@ -46,12 +47,16 @@ import ( // all been initialized. It is also the error returned if Unmarshal is // called with an encoded protocol buffer that does not include all the // required fields. +// +// When printed, ErrRequiredNotSet reports the first unset required field in a +// message. If the field cannot be precisely determined, it is reported as +// "{Unknown}". type ErrRequiredNotSet struct { - t reflect.Type + field string } func (e *ErrRequiredNotSet) Error() string { - return "proto: required fields not set in " + e.t.String() + return fmt.Sprintf("proto: required field %q not set", e.field) } var ( @@ -175,7 +180,8 @@ func Marshal(pb Message) ([]byte, error) { } p := NewBuffer(nil) err := p.Marshal(pb) - if err != nil { + var state errorState + if err != nil && !state.shouldContinue(err, nil) { return nil, err } return p.buf, err @@ -274,6 +280,7 @@ func isNil(v reflect.Value) bool { // Encode a message struct. func (o *Buffer) enc_struct_message(p *Properties, base structPointer) error { + var state errorState structp := structPointer_GetStructPointer(base, p.field) if structPointer_IsNil(structp) { return ErrNil @@ -283,7 +290,7 @@ func (o *Buffer) enc_struct_message(p *Properties, base structPointer) error { if p.isMarshaler { m := structPointer_Interface(structp, p.stype).(Marshaler) data, err := m.Marshal() - if err != nil { + if err != nil && !state.shouldContinue(err, nil) { return err } o.buf = append(o.buf, p.tagcode...) @@ -300,18 +307,19 @@ func (o *Buffer) enc_struct_message(p *Properties, base structPointer) error { nbuf := o.buf o.buf = obuf - if err != nil { + if err != nil && !state.shouldContinue(err, nil) { o.buffree(nbuf) return err } o.buf = append(o.buf, p.tagcode...) o.EncodeRawBytes(nbuf) o.buffree(nbuf) - return nil + return state.err } // Encode a group struct. func (o *Buffer) enc_struct_group(p *Properties, base structPointer) error { + var state errorState b := structPointer_GetStructPointer(base, p.field) if structPointer_IsNil(b) { return ErrNil @@ -319,11 +327,11 @@ func (o *Buffer) enc_struct_group(p *Properties, base structPointer) error { o.EncodeVarint(uint64((p.Tag << 3) | WireStartGroup)) err := o.enc_struct(p.stype, p.sprop, b) - if err != nil { + if err != nil && !state.shouldContinue(err, nil) { return err } o.EncodeVarint(uint64((p.Tag << 3) | WireEndGroup)) - return nil + return state.err } // Encode a slice of bools ([]bool). @@ -470,6 +478,7 @@ func (o *Buffer) enc_slice_string(p *Properties, base structPointer) error { // Encode a slice of message structs ([]*struct). func (o *Buffer) enc_slice_struct_message(p *Properties, base structPointer) error { + var state errorState s := structPointer_StructPointerSlice(base, p.field) l := s.Len() @@ -483,7 +492,7 @@ func (o *Buffer) enc_slice_struct_message(p *Properties, base structPointer) err if p.isMarshaler { m := structPointer_Interface(structp, p.stype).(Marshaler) data, err := m.Marshal() - if err != nil { + if err != nil && !state.shouldContinue(err, nil) { return err } o.buf = append(o.buf, p.tagcode...) @@ -498,7 +507,7 @@ func (o *Buffer) enc_slice_struct_message(p *Properties, base structPointer) err nbuf := o.buf o.buf = obuf - if err != nil { + if err != nil && !state.shouldContinue(err, nil) { o.buffree(nbuf) if err == ErrNil { return ErrRepeatedHasNil @@ -510,11 +519,12 @@ func (o *Buffer) enc_slice_struct_message(p *Properties, base structPointer) err o.buffree(nbuf) } - return nil + return state.err } // Encode a slice of group structs ([]*struct). func (o *Buffer) enc_slice_struct_group(p *Properties, base structPointer) error { + var state errorState s := structPointer_StructPointerSlice(base, p.field) l := s.Len() @@ -528,7 +538,7 @@ func (o *Buffer) enc_slice_struct_group(p *Properties, base structPointer) error err := o.enc_struct(p.stype, p.sprop, b) - if err != nil { + if err != nil && !state.shouldContinue(err, nil) { if err == ErrNil { return ErrRepeatedHasNil } @@ -537,7 +547,7 @@ func (o *Buffer) enc_slice_struct_group(p *Properties, base structPointer) error o.EncodeVarint(uint64((p.Tag << 3) | WireEndGroup)) } - return nil + return state.err } // Encode an extension map. @@ -569,7 +579,7 @@ func (o *Buffer) enc_map(p *Properties, base structPointer) error { // Encode a struct. func (o *Buffer) enc_struct(t reflect.Type, prop *StructProperties, base structPointer) error { - required := prop.reqCount + var state errorState // Encode fields in tag order so that decoders may use optimizations // that depend on the ordering. // http://code.google.com/apis/protocolbuffers/docs/encoding.html#order @@ -577,19 +587,15 @@ func (o *Buffer) enc_struct(t reflect.Type, prop *StructProperties, base structP p := prop.Prop[i] if p.enc != nil { err := p.enc(o, p, base) - if err != nil { + if err != nil && !state.shouldContinue(err, p) { if err != ErrNil { return err + } else if p.Required && state.err == nil { + state.err = &ErrRequiredNotSet{p.Name} } - } else if p.Required { - required-- } } } - // See if we encoded all required fields. - if required > 0 { - return &ErrRequiredNotSet{t} - } // Add unrecognized fields at the end. if prop.unrecField.IsValid() { @@ -599,5 +605,33 @@ func (o *Buffer) enc_struct(t reflect.Type, prop *StructProperties, base structP } } - return nil + return state.err +} + +// errorState maintains the first error that occurs and updates that error +// with additional context. +type errorState struct { + err error +} + +// shouldContinue reports whether encoding should continue upon encountering the +// given error. If the error is ErrRequiredNotSet, shouldContinue returns true +// and, if this is the first appearance of that error, remembers it for future +// reporting. +// +// If prop is not nil, it may update any error with additional context about the +// field with the error. +func (s *errorState) shouldContinue(err error, prop *Properties) bool { + // Ignore unset required fields. + reqNotSet, ok := err.(*ErrRequiredNotSet) + if !ok { + return false + } + if s.err == nil { + if prop != nil { + err = &ErrRequiredNotSet{prop.Name + "." + reqNotSet.field} + } + s.err = err + } + return true } diff --git a/third_party/code.google.com/p/goprotobuf/proto/extensions.go b/third_party/code.google.com/p/goprotobuf/proto/extensions.go index 1e1e4dc7d..e730b68dd 100644 --- a/third_party/code.google.com/p/goprotobuf/proto/extensions.go +++ b/third_party/code.google.com/p/goprotobuf/proto/extensions.go @@ -109,11 +109,11 @@ func isExtensionField(pb extendableProto, field int32) bool { func checkExtensionTypes(pb extendableProto, extension *ExtensionDesc) error { // Check the extended type. if a, b := reflect.TypeOf(pb), reflect.TypeOf(extension.ExtendedType); a != b { - return errors.New("bad extended type; " + b.String() + " does not extend " + a.String()) + return errors.New("proto: bad extended type; " + b.String() + " does not extend " + a.String()) } // Check the range. if !isExtensionField(pb, extension.Field) { - return errors.New("bad extension number; not in declared ranges") + return errors.New("proto: bad extension number; not in declared ranges") } return nil } @@ -272,7 +272,7 @@ func decodeExtension(b []byte, extension *ExtensionDesc) (interface{}, error) { func GetExtensions(pb Message, es []*ExtensionDesc) (extensions []interface{}, err error) { epb, ok := pb.(extendableProto) if !ok { - err = errors.New("not an extendable proto") + err = errors.New("proto: not an extendable proto") return } extensions = make([]interface{}, len(es)) @@ -292,7 +292,7 @@ func SetExtension(pb extendableProto, extension *ExtensionDesc, value interface{ } typ := reflect.TypeOf(extension.ExtensionType) if typ != reflect.TypeOf(value) { - return errors.New("bad extension value type") + return errors.New("proto: bad extension value type") } pb.ExtensionMap()[extension.Field] = Extension{desc: extension, value: value} diff --git a/third_party/code.google.com/p/goprotobuf/proto/testdata/test.pb.go b/third_party/code.google.com/p/goprotobuf/proto/testdata/test.pb.go index d5e2d5eda..b8e40cfc7 100644 --- a/third_party/code.google.com/p/goprotobuf/proto/testdata/test.pb.go +++ b/third_party/code.google.com/p/goprotobuf/proto/testdata/test.pb.go @@ -244,7 +244,7 @@ func (m *GoEnum) GetFoo() FOO { if m != nil && m.Foo != nil { return *m.Foo } - return 0 + return FOO_FOO1 } type GoTestField struct { @@ -378,7 +378,7 @@ func (m *GoTest) GetKind() GoTest_KIND { if m != nil && m.Kind != nil { return *m.Kind } - return 0 + return GoTest_VOID } func (m *GoTest) GetTable() string { @@ -1289,7 +1289,7 @@ func (m *MyMessage) GetBikeshed() MyMessage_Color { if m != nil && m.Bikeshed != nil { return *m.Bikeshed } - return 0 + return MyMessage_RED } func (m *MyMessage) GetSomegroup() *MyMessage_SomeGroup { diff --git a/third_party/code.google.com/p/goprotobuf/proto/text_parser.go b/third_party/code.google.com/p/goprotobuf/proto/text_parser.go index f39b90885..13827f636 100644 --- a/third_party/code.google.com/p/goprotobuf/proto/text_parser.go +++ b/third_party/code.google.com/p/goprotobuf/proto/text_parser.go @@ -193,8 +193,8 @@ func (p *textParser) advance() { } var ( - errBadUTF8 = errors.New("bad UTF-8") - errBadHex = errors.New("bad hexadecimal") + errBadUTF8 = errors.New("proto: bad UTF-8") + errBadHex = errors.New("proto: bad hexadecimal") ) func unquoteC(s string, quote rune) (string, error) { diff --git a/third_party/code.google.com/p/goprotobuf/proto/text_test.go b/third_party/code.google.com/p/goprotobuf/proto/text_test.go index f5d057462..c64b073c7 100644 --- a/third_party/code.google.com/p/goprotobuf/proto/text_test.go +++ b/third_party/code.google.com/p/goprotobuf/proto/text_test.go @@ -303,7 +303,7 @@ type limitedWriter struct { limit int } -var outOfSpace = errors.New("insufficient space") +var outOfSpace = errors.New("proto: insufficient space") func (w *limitedWriter) Write(p []byte) (n int, err error) { var avail = w.limit - w.b.Len() diff --git a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/descriptor/descriptor.pb.go b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/descriptor/descriptor.pb.go index 3acbe2991..0b34acb62 100644 --- a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/descriptor/descriptor.pb.go +++ b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/descriptor/descriptor.pb.go @@ -487,14 +487,14 @@ func (m *FieldDescriptorProto) GetLabel() FieldDescriptorProto_Label { if m != nil && m.Label != nil { return *m.Label } - return 0 + return FieldDescriptorProto_LABEL_OPTIONAL } func (m *FieldDescriptorProto) GetType() FieldDescriptorProto_Type { if m != nil && m.Type != nil { return *m.Type } - return 0 + return FieldDescriptorProto_TYPE_DOUBLE } func (m *FieldDescriptorProto) GetTypeName() string { diff --git a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go index 413f3614e..0b769d454 100644 --- a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go +++ b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go @@ -1664,6 +1664,27 @@ func (g *Generator) generateMessage(message *Descriptor) { g.P("return false") case descriptor.FieldDescriptorProto_TYPE_STRING: g.P(`return ""`) + case descriptor.FieldDescriptorProto_TYPE_ENUM: + // The default default for an enum is the first value in the enum, + // not zero. + obj := g.ObjectNamed(field.GetTypeName()) + var enum *EnumDescriptor + if id, ok := obj.(*ImportedDescriptor); ok { + // The enum type has been publicly imported. + enum, _ = id.o.(*EnumDescriptor) + } else { + enum, _ = obj.(*EnumDescriptor) + } + if enum == nil { + log.Printf("don't know how to generate getter for %s", field.GetName()) + continue + } + if len(enum.Value) == 0 { + g.P("return 0 // empty enum") + } else { + first := enum.Value[0].GetName() + g.P("return ", g.DefaultPackageName(obj)+enum.prefix()+first) + } default: g.P("return 0") } diff --git a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.pb.go b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.pb.go index a9d538d20..cfe977758 100644 --- a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.pb.go +++ b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.pb.go @@ -199,7 +199,7 @@ func (m *Request) GetHue() Request_Color { if m != nil && m.Hue != nil { return *m.Hue } - return 0 + return Request_RED } func (m *Request) GetHat() HatType { diff --git a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.pb.go.golden b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.pb.go.golden index a9d538d20..cfe977758 100644 --- a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.pb.go.golden +++ b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.pb.go.golden @@ -199,7 +199,7 @@ func (m *Request) GetHue() Request_Color { if m != nil && m.Hue != nil { return *m.Hue } - return 0 + return Request_RED } func (m *Request) GetHat() HatType { diff --git a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.proto b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.proto index 478e697c2..551585d0c 100644 --- a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.proto +++ b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/testdata/my_test/test.proto @@ -58,7 +58,7 @@ message Request { } repeated int64 key = 1; // optional imp.ImportedMessage imported_message = 2; - optional Color hue = 3; + optional Color hue = 3; // no default optional HatType hat = 4 [default=FEDORA]; // optional imp.ImportedMessage.Owner owner = 6; optional float deadline = 7 [default=inf]; diff --git a/third_party/github.com/ccding/go-logging/example.conf b/third_party/github.com/ccding/go-logging/example.conf index ecc9a860e..d64b690b1 100644 --- a/third_party/github.com/ccding/go-logging/example.conf +++ b/third_party/github.com/ccding/go-logging/example.conf @@ -1,3 +1,2 @@ -name = example +name = logger3 sync = 0 - diff --git a/third_party/github.com/ccding/go-logging/example.go b/third_party/github.com/ccding/go-logging/example.go index 1b82842fa..06f664381 100644 --- a/third_party/github.com/ccding/go-logging/example.go +++ b/third_party/github.com/ccding/go-logging/example.go @@ -22,14 +22,14 @@ import ( ) func main() { - logger1, _ := logging.SimpleLogger("main") + logger1, _ := logging.SimpleLogger("logger1") logger1.SetLevel(logging.NOTSET) logger1.Error("this is a test from error") logger1.Debug("this is a test from debug") logger1.Notset("orz", time.Now().UnixNano()) logger1.Destroy() - logger2, _ := logging.RichLogger("main") + logger2, _ := logging.RichLogger("logger2") logger2.SetLevel(logging.DEBUG) logger2.Error("this is a test from error") logger2.Debug("this is a test from debug") diff --git a/third_party/github.com/ccding/go-logging/logging/commands.go b/third_party/github.com/ccding/go-logging/logging/commands.go index be1940598..4a222d7f9 100644 --- a/third_party/github.com/ccding/go-logging/logging/commands.go +++ b/third_party/github.com/ccding/go-logging/logging/commands.go @@ -16,7 +16,7 @@ package logging -// Logln receives log request from the client. The request includes a set of +// Log receives log request from the client. The request includes a set of // variables. func (logger *Logger) Log(level Level, v ...interface{}) { // Don't delete this calling. The calling is used to keep the same diff --git a/third_party/github.com/ccding/go-logging/logging/logging.go b/third_party/github.com/ccding/go-logging/logging/logging.go index 6467d94ef..0268f5770 100644 --- a/third_party/github.com/ccding/go-logging/logging/logging.go +++ b/third_party/github.com/ccding/go-logging/logging/logging.go @@ -97,7 +97,7 @@ func RichLogger(name string) (*Logger, error) { // FileLogger creates a new logger with file output. func FileLogger(name string, level Level, format string, timeFormat string, file string, sync bool) (*Logger, error) { - out, err := os.Create(file) + out, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.ModeAppend|0666) if err != nil { return nil, err } diff --git a/third_party/github.com/coreos/go-etcd/README.md b/third_party/github.com/coreos/go-etcd/README.md index ac87acdfa..7a8f8b34a 100644 --- a/third_party/github.com/coreos/go-etcd/README.md +++ b/third_party/github.com/coreos/go-etcd/README.md @@ -2,4 +2,49 @@ golang client library for etcd -This etcd client library is under heavy development. Check back soon for more docs. In the meantime, check out [etcd](https://github.com/coreos/etcd) for details on the client protocol. +This etcd client library is under heavy development. Check back soon for more +docs. In the meantime, check out [etcd](https://github.com/coreos/etcd) for +details on the client protocol. + +For usage see example below or look at godoc: [go-etcd/etcd](http://godoc.org/github.com/coreos/go-etcd/etcd) + +## Install + +```bash +go get github.com/coreos/go-etcd/etcd +``` + +## Examples + +Returning error values are not showed for the sake of simplicity, but you +should check them. + +```go +package main + +import ( + "fmt" + "github.com/coreos/go-etcd/etcd" +) + +func main() { + c := etcd.NewClient() // default binds to http://0.0.0.0:4001 + + // SET the value "bar" to the key "foo" with zero TTL + // returns a: *store.Response + res, _ := c.Set("foo", "bar", 0) + fmt.Printf("set response: %+v\n", res) + + // GET the value that is stored for the key "foo" + // return a slice: []*store.Response + values, _ := c.Get("foo") + for i, res := range values { // .. and print them out + fmt.Printf("[%d] get response: %+v\n", i, res) + } + + // DELETE the key "foo" + // returns a: *store.Response + res, _ = c.Delete("foo") + fmt.Printf("delete response: %+v\n", res) +} +``` diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index 723e0ebdc..941cb45e1 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -117,7 +117,7 @@ func (c *Client) SyncCluster() bool { // sync cluster information by providing machine list func (c *Client) internalSyncCluster(machines []string) bool { for _, machine := range machines { - httpPath := c.createHttpPath(machine, "v1/machines") + httpPath := c.createHttpPath(machine, version+"/machines") resp, err := c.httpClient.Get(httpPath) if err != nil { // try another machine in the cluster @@ -236,11 +236,12 @@ func (c *Client) sendRequest(method string, _path string, body string) (*http.Re // try to connect the leader continue } else if resp.StatusCode == http.StatusInternalServerError { + resp.Body.Close() + retry++ if retry > 2*len(c.cluster.Machines) { return nil, errors.New("Cannot reach servers") } - resp.Body.Close() continue } else { logger.Debug("send.return.response ", httpPath) diff --git a/third_party/github.com/coreos/go-etcd/etcd/version.go b/third_party/github.com/coreos/go-etcd/etcd/version.go index b27956805..e84e7b5b7 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/version.go +++ b/third_party/github.com/coreos/go-etcd/etcd/version.go @@ -1,3 +1,3 @@ package etcd -var version = "v1" +const version = "v1" diff --git a/third_party/github.com/coreos/go-raft/append_entries_request.go b/third_party/github.com/coreos/go-raft/append_entries_request.go index 78338e4eb..af56f2a0c 100644 --- a/third_party/github.com/coreos/go-raft/append_entries_request.go +++ b/third_party/github.com/coreos/go-raft/append_entries_request.go @@ -31,7 +31,7 @@ func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint6 // Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes // written and any error that may have occurred. -func (req *AppendEntriesRequest) encode(w io.Writer) (int, error) { +func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) { protoEntries := make([]*protobuf.ProtoAppendEntriesRequest_ProtoLogEntry, len(req.Entries)) @@ -63,7 +63,7 @@ func (req *AppendEntriesRequest) encode(w io.Writer) (int, error) { // Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and // any error that occurs. -func (req *AppendEntriesRequest) decode(r io.Reader) (int, error) { +func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) { data, err := ioutil.ReadAll(r) if err != nil { diff --git a/third_party/github.com/coreos/go-raft/append_entries_request_test.go b/third_party/github.com/coreos/go-raft/append_entries_request_test.go index ef6732fc4..d8cbce735 100644 --- a/third_party/github.com/coreos/go-raft/append_entries_request_test.go +++ b/third_party/github.com/coreos/go-raft/append_entries_request_test.go @@ -10,7 +10,7 @@ func BenchmarkAppendEntriesRequestEncoding(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { var buf bytes.Buffer - req.encode(&buf) + req.Encode(&buf) } b.SetBytes(int64(len(tmp))) } @@ -19,7 +19,7 @@ func BenchmarkAppendEntriesRequestDecoding(b *testing.B) { req, buf := createTestAppendEntriesRequest(2000) b.ResetTimer() for i := 0; i < b.N; i++ { - req.decode(bytes.NewReader(buf)) + req.Decode(bytes.NewReader(buf)) } b.SetBytes(int64(len(buf))) } @@ -34,7 +34,7 @@ func createTestAppendEntriesRequest(entryCount int) (*AppendEntriesRequest, []by req := newAppendEntriesRequest(1, 1, 1, 1, "leader", entries) var buf bytes.Buffer - req.encode(&buf) + req.Encode(&buf) return req, buf.Bytes() } diff --git a/third_party/github.com/coreos/go-raft/append_entries_response.go b/third_party/github.com/coreos/go-raft/append_entries_response.go index e2b02ae87..363af46cd 100644 --- a/third_party/github.com/coreos/go-raft/append_entries_response.go +++ b/third_party/github.com/coreos/go-raft/append_entries_response.go @@ -30,7 +30,7 @@ func newAppendEntriesResponse(term uint64, success bool, index uint64, commitInd // Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes // written and any error that may have occurred. -func (resp *AppendEntriesResponse) encode(w io.Writer) (int, error) { +func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error) { pb := &protobuf.ProtoAppendEntriesResponse{ Term: proto.Uint64(resp.Term), Index: proto.Uint64(resp.Index), @@ -47,7 +47,7 @@ func (resp *AppendEntriesResponse) encode(w io.Writer) (int, error) { // Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and // any error that occurs. -func (resp *AppendEntriesResponse) decode(r io.Reader) (int, error) { +func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error) { data, err := ioutil.ReadAll(r) if err != nil { diff --git a/third_party/github.com/coreos/go-raft/append_entries_response_test.go b/third_party/github.com/coreos/go-raft/append_entries_response_test.go index 038dcda76..f51ead1f8 100644 --- a/third_party/github.com/coreos/go-raft/append_entries_response_test.go +++ b/third_party/github.com/coreos/go-raft/append_entries_response_test.go @@ -10,7 +10,7 @@ func BenchmarkAppendEntriesResponseEncoding(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { var buf bytes.Buffer - req.encode(&buf) + req.Encode(&buf) } b.SetBytes(int64(len(tmp))) } @@ -19,7 +19,7 @@ func BenchmarkAppendEntriesResponseDecoding(b *testing.B) { req, buf := createTestAppendEntriesResponse(2000) b.ResetTimer() for i := 0; i < b.N; i++ { - req.decode(bytes.NewReader(buf)) + req.Decode(bytes.NewReader(buf)) } b.SetBytes(int64(len(buf))) } @@ -28,7 +28,7 @@ func createTestAppendEntriesResponse(entryCount int) (*AppendEntriesResponse, [] resp := newAppendEntriesResponse(1, true, 1, 1) var buf bytes.Buffer - resp.encode(&buf) + resp.Encode(&buf) return resp, buf.Bytes() } diff --git a/third_party/github.com/coreos/go-raft/http_transporter.go b/third_party/github.com/coreos/go-raft/http_transporter.go index 7dbcf5a40..e41fd817f 100644 --- a/third_party/github.com/coreos/go-raft/http_transporter.go +++ b/third_party/github.com/coreos/go-raft/http_transporter.go @@ -89,7 +89,7 @@ func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) { // Sends an AppendEntries RPC to a peer. func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { var b bytes.Buffer - if _, err := req.encode(&b); err != nil { + if _, err := req.Encode(&b); err != nil { traceln("transporter.ae.encoding.error:", err) return nil } @@ -106,7 +106,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r defer httpResp.Body.Close() resp := &AppendEntriesResponse{} - if _, err = resp.decode(httpResp.Body); err != nil && err != io.EOF { + if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF { traceln("transporter.ae.decoding.error:", err) return nil } @@ -117,7 +117,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r // Sends a RequestVote RPC to a peer. func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { var b bytes.Buffer - if _, err := req.encode(&b); err != nil { + if _, err := req.Encode(&b); err != nil { traceln("transporter.rv.encoding.error:", err) return nil } @@ -134,7 +134,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque defer httpResp.Body.Close() resp := &RequestVoteResponse{} - if _, err = resp.decode(httpResp.Body); err != nil && err != io.EOF { + if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF { traceln("transporter.rv.decoding.error:", err) return nil } @@ -162,13 +162,13 @@ func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc traceln(server.Name(), "RECV /appendEntries") req := &AppendEntriesRequest{} - if _, err := req.decode(r.Body); err != nil { + if _, err := req.Decode(r.Body); err != nil { http.Error(w, "", http.StatusBadRequest) return } resp := server.AppendEntries(req) - if _, err := resp.encode(w); err != nil { + if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return } @@ -181,13 +181,13 @@ func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc { traceln(server.Name(), "RECV /requestVote") req := &RequestVoteRequest{} - if _, err := req.decode(r.Body); err != nil { + if _, err := req.Decode(r.Body); err != nil { http.Error(w, "", http.StatusBadRequest) return } resp := server.RequestVote(req) - if _, err := resp.encode(w); err != nil { + if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return } diff --git a/third_party/github.com/coreos/go-raft/log.go b/third_party/github.com/coreos/go-raft/log.go index b686d317c..5733c2188 100644 --- a/third_party/github.com/coreos/go-raft/log.go +++ b/third_party/github.com/coreos/go-raft/log.go @@ -180,26 +180,23 @@ func (l *Log) open(path string) error { } break } - - // Append entry. - l.entries = append(l.entries, entry) - - if entry.Index <= l.commitIndex { - command, err := newCommand(entry.CommandName, entry.Command) - if err != nil { - continue + if entry.Index > l.startIndex { + // Append entry. + l.entries = append(l.entries, entry) + if entry.Index <= l.commitIndex { + command, err := newCommand(entry.CommandName, entry.Command) + if err != nil { + continue + } + l.ApplyFunc(command) } - l.ApplyFunc(command) + debugln("open.log.append log index ", entry.Index) } - debugln("open.log.append log index ", entry.Index) - readBytes += int64(n) } l.results = make([]*logResult, len(l.entries)) - l.compact(l.startIndex, l.startTerm) - debugln("open.log.recovery number of log ", len(l.entries)) return nil } @@ -273,6 +270,8 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]* entries := l.entries[index-l.startIndex:] length := len(entries) + traceln("log.entriesAfter: startIndex:", l.startIndex, " lenght", len(l.entries)) + if uint64(length) < maxLogEntriesPerRequest { // Determine the term at the given entry and return a subslice. return entries, l.entries[index-1-l.startIndex].Term @@ -353,7 +352,10 @@ func (l *Log) lastInfo() (index uint64, term uint64) { func (l *Log) updateCommitIndex(index uint64) { l.mutex.Lock() defer l.mutex.Unlock() - l.commitIndex = index + if index > l.commitIndex { + l.commitIndex = index + } + debugln("update.commit.index ", index) } // Updates the commit index and writes entries after that index to the stable storage. diff --git a/third_party/github.com/coreos/go-raft/peer.go b/third_party/github.com/coreos/go-raft/peer.go index 37b8c3fb7..7b116edbb 100644 --- a/third_party/github.com/coreos/go-raft/peer.go +++ b/third_party/github.com/coreos/go-raft/peer.go @@ -255,6 +255,12 @@ func (p *Peer) sendSnapshotRecoveryRequest() { req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot) debugln("peer.snap.recovery.send: ", p.Name) resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req) + + if resp == nil { + debugln("peer.snap.recovery.timeout: ", p.Name) + return + } + if resp.Success { p.prevLogIndex = req.LastIndex } else { diff --git a/third_party/github.com/coreos/go-raft/request_vote_request.go b/third_party/github.com/coreos/go-raft/request_vote_request.go index a7571d8b3..c4e5ac697 100644 --- a/third_party/github.com/coreos/go-raft/request_vote_request.go +++ b/third_party/github.com/coreos/go-raft/request_vote_request.go @@ -28,7 +28,7 @@ func newRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint6 // Encodes the RequestVoteRequest to a buffer. Returns the number of bytes // written and any error that may have occurred. -func (req *RequestVoteRequest) encode(w io.Writer) (int, error) { +func (req *RequestVoteRequest) Encode(w io.Writer) (int, error) { pb := &protobuf.ProtoRequestVoteRequest{ Term: proto.Uint64(req.Term), LastLogIndex: proto.Uint64(req.LastLogIndex), @@ -45,7 +45,7 @@ func (req *RequestVoteRequest) encode(w io.Writer) (int, error) { // Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and // any error that occurs. -func (req *RequestVoteRequest) decode(r io.Reader) (int, error) { +func (req *RequestVoteRequest) Decode(r io.Reader) (int, error) { data, err := ioutil.ReadAll(r) if err != nil { diff --git a/third_party/github.com/coreos/go-raft/request_vote_response.go b/third_party/github.com/coreos/go-raft/request_vote_response.go index 9ed1bc9b9..1870245a1 100644 --- a/third_party/github.com/coreos/go-raft/request_vote_response.go +++ b/third_party/github.com/coreos/go-raft/request_vote_response.go @@ -24,7 +24,7 @@ func newRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse // Encodes the RequestVoteResponse to a buffer. Returns the number of bytes // written and any error that may have occurred. -func (resp *RequestVoteResponse) encode(w io.Writer) (int, error) { +func (resp *RequestVoteResponse) Encode(w io.Writer) (int, error) { pb := &protobuf.ProtoRequestVoteResponse{ Term: proto.Uint64(resp.Term), VoteGranted: proto.Bool(resp.VoteGranted), @@ -40,7 +40,7 @@ func (resp *RequestVoteResponse) encode(w io.Writer) (int, error) { // Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and // any error that occurs. -func (resp *RequestVoteResponse) decode(r io.Reader) (int, error) { +func (resp *RequestVoteResponse) Decode(r io.Reader) (int, error) { data, err := ioutil.ReadAll(r) if err != nil { diff --git a/third_party/github.com/coreos/go-raft/server.go b/third_party/github.com/coreos/go-raft/server.go index b4dab92ae..e1a51a908 100644 --- a/third_party/github.com/coreos/go-raft/server.go +++ b/third_party/github.com/coreos/go-raft/server.go @@ -80,6 +80,8 @@ type Server struct { lastSnapshot *Snapshot stateMachine StateMachine maxLogEntriesPerRequest uint64 + + connectionString string } // An event to be processed by the server's event loop. @@ -96,7 +98,7 @@ type event struct { //------------------------------------------------------------------------------ // Creates a new server with a log at the given path. -func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}) (*Server, error) { +func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectiongString string) (*Server, error) { if name == "" { return nil, errors.New("raft.Server: Name cannot be blank") } @@ -117,6 +119,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S electionTimeout: DefaultElectionTimeout, heartbeatTimeout: DefaultHeartbeatTimeout, maxLogEntriesPerRequest: MaxLogEntriesPerRequest, + connectionString: connectiongString, } // Setup apply function. @@ -1009,10 +1012,16 @@ func (s *Server) TakeSnapshot() error { state = []byte{0} } - var peers []*Peer + peers := make([]*Peer, len(s.peers)+1) + i := 0 for _, peer := range s.peers { - peers = append(peers, peer.clone()) + peers[i] = peer.clone() + } + + peers[i] = &Peer{ + Name: s.Name(), + ConnectionString: s.connectionString, } s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path} @@ -1253,7 +1262,7 @@ func (s *Server) readConf() error { return err } - s.log.commitIndex = conf.CommitIndex + s.log.updateCommitIndex(conf.CommitIndex) return nil } diff --git a/third_party/github.com/coreos/go-raft/server_test.go b/third_party/github.com/coreos/go-raft/server_test.go index 2a1559970..01bc96b71 100644 --- a/third_party/github.com/coreos/go-raft/server_test.go +++ b/third_party/github.com/coreos/go-raft/server_test.go @@ -428,7 +428,7 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { for _, name := range names { server := servers[name] if server.CommitIndex() != 17 { - t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16) + t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 17) } server.Stop() } diff --git a/third_party/github.com/coreos/go-raft/test.go b/third_party/github.com/coreos/go-raft/test.go index 025cf0f58..95a6c7168 100644 --- a/third_party/github.com/coreos/go-raft/test.go +++ b/third_party/github.com/coreos/go-raft/test.go @@ -65,12 +65,12 @@ func newTestServer(name string, transporter Transporter) *Server { if err := os.MkdirAll(p, 0644); err != nil { panic(err.Error()) } - server, _ := NewServer(name, p, transporter, nil, nil) + server, _ := NewServer(name, p, transporter, nil, nil, "") return server } func newTestServerWithPath(name string, transporter Transporter, p string) *Server { - server, _ := NewServer(name, p, transporter, nil, nil) + server, _ := NewServer(name, p, transporter, nil, nil, "") return server } From 24b34d0a1e6d2c4b78b8e0567a62964f634e90f7 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 22 Sep 2013 01:32:49 -0400 Subject: [PATCH 3/7] bump 3rd party --- third_party/github.com/coreos/go-raft/server.go | 5 +++-- .../github.com/coreos/go-raft/snapshot_recovery_request.go | 4 ++-- .../github.com/coreos/go-raft/snapshot_recovery_response.go | 4 ++-- third_party/github.com/coreos/go-raft/snapshot_request.go | 4 ++-- third_party/github.com/coreos/go-raft/snapshot_response.go | 4 ++-- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/third_party/github.com/coreos/go-raft/server.go b/third_party/github.com/coreos/go-raft/server.go index e1a51a908..645a2e4a0 100644 --- a/third_party/github.com/coreos/go-raft/server.go +++ b/third_party/github.com/coreos/go-raft/server.go @@ -98,7 +98,7 @@ type event struct { //------------------------------------------------------------------------------ // Creates a new server with a log at the given path. -func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectiongString string) (*Server, error) { +func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error) { if name == "" { return nil, errors.New("raft.Server: Name cannot be blank") } @@ -119,7 +119,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S electionTimeout: DefaultElectionTimeout, heartbeatTimeout: DefaultHeartbeatTimeout, maxLogEntriesPerRequest: MaxLogEntriesPerRequest, - connectionString: connectiongString, + connectionString: connectionString, } // Setup apply function. @@ -1017,6 +1017,7 @@ func (s *Server) TakeSnapshot() error { i := 0 for _, peer := range s.peers { peers[i] = peer.clone() + i++ } peers[i] = &Peer{ diff --git a/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go b/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go index 57b3e3a88..a05f43108 100644 --- a/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go +++ b/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go @@ -35,7 +35,7 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot // Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes // written and any error that may have occurred. -func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) { +func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error) { protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers)) @@ -63,7 +63,7 @@ func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) { // Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and // any error that occurs. -func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) { +func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) { data, err := ioutil.ReadAll(r) if err != nil { diff --git a/third_party/github.com/coreos/go-raft/snapshot_recovery_response.go b/third_party/github.com/coreos/go-raft/snapshot_recovery_response.go index 2b2f1cde1..7e4d86ace 100644 --- a/third_party/github.com/coreos/go-raft/snapshot_recovery_response.go +++ b/third_party/github.com/coreos/go-raft/snapshot_recovery_response.go @@ -31,7 +31,7 @@ func newSnapshotRecoveryResponse(term uint64, success bool, commitIndex uint64) // Encodes the SnapshotRecoveryResponse to a buffer. Returns the number of bytes // written and any error that may have occurred. -func (req *SnapshotRecoveryResponse) encode(w io.Writer) (int, error) { +func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error) { pb := &protobuf.ProtoSnapshotRecoveryResponse{ Term: proto.Uint64(req.Term), Success: proto.Bool(req.Success), @@ -47,7 +47,7 @@ func (req *SnapshotRecoveryResponse) encode(w io.Writer) (int, error) { // Decodes the SnapshotRecoveryResponse from a buffer. Returns the number of bytes read and // any error that occurs. -func (req *SnapshotRecoveryResponse) decode(r io.Reader) (int, error) { +func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error) { data, err := ioutil.ReadAll(r) if err != nil { diff --git a/third_party/github.com/coreos/go-raft/snapshot_request.go b/third_party/github.com/coreos/go-raft/snapshot_request.go index c2f2cc768..3d75a52cc 100644 --- a/third_party/github.com/coreos/go-raft/snapshot_request.go +++ b/third_party/github.com/coreos/go-raft/snapshot_request.go @@ -31,7 +31,7 @@ func newSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest // Encodes the SnapshotRequest to a buffer. Returns the number of bytes // written and any error that may have occurred. -func (req *SnapshotRequest) encode(w io.Writer) (int, error) { +func (req *SnapshotRequest) Encode(w io.Writer) (int, error) { pb := &protobuf.ProtoSnapshotRequest{ LeaderName: proto.String(req.LeaderName), LastIndex: proto.Uint64(req.LastIndex), @@ -47,7 +47,7 @@ func (req *SnapshotRequest) encode(w io.Writer) (int, error) { // Decodes the SnapshotRequest from a buffer. Returns the number of bytes read and // any error that occurs. -func (req *SnapshotRequest) decode(r io.Reader) (int, error) { +func (req *SnapshotRequest) Decode(r io.Reader) (int, error) { data, err := ioutil.ReadAll(r) if err != nil { diff --git a/third_party/github.com/coreos/go-raft/snapshot_response.go b/third_party/github.com/coreos/go-raft/snapshot_response.go index 2e6c1c518..bd27f67af 100644 --- a/third_party/github.com/coreos/go-raft/snapshot_response.go +++ b/third_party/github.com/coreos/go-raft/snapshot_response.go @@ -27,7 +27,7 @@ func newSnapshotResponse(success bool) *SnapshotResponse { // Encodes the SnapshotResponse to a buffer. Returns the number of bytes // written and any error that may have occurred. -func (resp *SnapshotResponse) encode(w io.Writer) (int, error) { +func (resp *SnapshotResponse) Encode(w io.Writer) (int, error) { pb := &protobuf.ProtoSnapshotResponse{ Success: proto.Bool(resp.Success), } @@ -41,7 +41,7 @@ func (resp *SnapshotResponse) encode(w io.Writer) (int, error) { // Decodes the SnapshotResponse from a buffer. Returns the number of bytes read and // any error that occurs. -func (resp *SnapshotResponse) decode(r io.Reader) (int, error) { +func (resp *SnapshotResponse) Decode(r io.Reader) (int, error) { data, err := ioutil.ReadAll(r) if err != nil { From aff4af1d0b220629bfd2d2336bfc0a02ef926e97 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Wed, 25 Sep 2013 17:04:05 -0700 Subject: [PATCH 4/7] fix(build): use /bin/sh it seems to work with bash -o posix. Ship it. --- build | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build b/build index a4fe58737..b121ba30d 100755 --- a/build +++ b/build @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh ETCD_PACKAGE=github.com/coreos/etcd export GOPATH="${PWD}" From 0ef9d944f660065b1fbf70eacbdc48d049e8a533 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Wed, 25 Sep 2013 17:04:34 -0700 Subject: [PATCH 5/7] fix(gitignore): ignore the actual binary --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index c16223f09..8459ca713 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ src/ pkg/ -./etcd +/etcd release_version.go From cbd8a4fb9c97950c7b4d7bbc6e60bd5e64746144 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Thu, 26 Sep 2013 10:40:33 -0700 Subject: [PATCH 6/7] feat(scripts/test-cluster): add a cluster test command this uses tmux to setup a test cluster that you can easily kill and start for debugging. --- .gitignore | 1 + scripts/test-cluster | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100755 scripts/test-cluster diff --git a/.gitignore b/.gitignore index 8459ca713..d00d899e2 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ src/ pkg/ /etcd release_version.go +/machine* diff --git a/scripts/test-cluster b/scripts/test-cluster new file mode 100755 index 000000000..ccdedd1b7 --- /dev/null +++ b/scripts/test-cluster @@ -0,0 +1,19 @@ +#!/bin/bash +SESSION=etcd-cluster + +tmux new-session -d -s $SESSION + +# Setup a window for tailing log files +tmux new-window -t $SESSION:1 -n 'machines' +tmux split-window -h +tmux select-pane -t 0 +tmux send-keys "./etcd -s 127.0.0.1:7001 -c 127.0.0.1:4001 -d machine1 -n machine1" C-m + +for i in 2 3; do + tmux select-pane -t 0 + tmux split-window -v + tmux send-keys "./etcd -cors='*' -s 127.0.0.1:700${i} -c 127.0.0.1:400${i} -C 127.0.0.1:7001 -d machine${i} -n machine${i}" C-m +done + +# Attach to session +tmux attach-session -t $SESSION From da01fe602774341353b8eebeff0f1c845dc80eb3 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Thu, 26 Sep 2013 12:17:54 -0700 Subject: [PATCH 7/7] fix(command): make Latency and Counts objects instead of suffixing everything make a latency object --- command.go | 3 ++- raft_stats.go | 45 +++++++++++++++++++++++++-------------------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/command.go b/command.go index dc0684707..a469a53fd 100644 --- a/command.go +++ b/command.go @@ -172,7 +172,8 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { // add peer stats if c.Name != r.Name() { - r.peersStats.Peers[c.Name] = &raftPeerStats{MinLatency: 1 << 63} + r.peersStats.Peers[c.Name] = &raftPeerStats{} + r.peersStats.Peers[c.Name].Latency.Minimum = 1 << 63 } return b, err diff --git a/raft_stats.go b/raft_stats.go index 439c14ce9..f17945ea6 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -85,43 +85,48 @@ type raftPeersStats struct { } type raftPeerStats struct { - Latency float64 `json:"latency"` - AvgLatency float64 `json:"averageLatency"` - avgLatencySquare float64 - SdvLatency float64 `json:"sdvLatency"` - MinLatency float64 `json:"minLatency"` - MaxLatency float64 `json:"maxLatency"` - FailCnt uint64 `json:"failsCount"` - SuccCnt uint64 `json:"successCount"` + Latency struct { + Current float64 `json:"current"` + Average float64 `json:"average"` + averageSquare float64 + StandardDeviation float64 `json:"standardDeviation"` + Minimum float64 `json:"minimum"` + Maximum float64 `json:"maximum"` + } `json:"latency"` + + Counts struct { + Fail uint64 `json:"fail"` + Success uint64 `json:"success"` + } `json:"counts"` } // Succ function update the raftPeerStats with a successful send func (ps *raftPeerStats) Succ(d time.Duration) { - total := float64(ps.SuccCnt) * ps.AvgLatency - totalSquare := float64(ps.SuccCnt) * ps.avgLatencySquare + total := float64(ps.Counts.Success) * ps.Latency.Average + totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare - ps.SuccCnt++ + ps.Counts.Success++ - ps.Latency = float64(d) / (1000000.0) + ps.Latency.Current = float64(d) / (1000000.0) - if ps.Latency > ps.MaxLatency { - ps.MaxLatency = ps.Latency + if ps.Latency.Current > ps.Latency.Maximum { + ps.Latency.Maximum = ps.Latency.Current } - if ps.Latency < ps.MinLatency { - ps.MinLatency = ps.Latency + if ps.Latency.Current < ps.Latency.Minimum { + ps.Latency.Minimum = ps.Latency.Current } - ps.AvgLatency = (total + ps.Latency) / float64(ps.SuccCnt) - ps.avgLatencySquare = (totalSquare + ps.Latency*ps.Latency) / float64(ps.SuccCnt) + ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success) + ps.Latency.averageSquare = (totalSquare + ps.Latency.Current * ps.Latency.Current) / float64(ps.Counts.Success) // sdv = sqrt(avg(x^2) - avg(x)^2) - ps.SdvLatency = math.Sqrt(ps.avgLatencySquare - ps.AvgLatency*ps.AvgLatency) + ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average) } // Fail function update the raftPeerStats with a unsuccessful send func (ps *raftPeerStats) Fail() { - ps.FailCnt++ + ps.Counts.Fail++ } type statsQueue struct {