diff --git a/.godir b/.godir new file mode 100644 index 000000000..00ff6aa80 --- /dev/null +++ b/.godir @@ -0,0 +1 @@ +github.com/coreos/etcd diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index ec3aa430b..9f13976ca 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -18,14 +18,11 @@ package etcdhttp import ( "encoding/json" - "io/ioutil" "log" "net/http" - "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" ) const ( @@ -35,12 +32,7 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { - rh := &raftHandler{ - stats: server, - server: server, - clusterInfo: server.Cluster, - } - + rh := rafthttp.NewHandler(server, server.Cluster.ID()) mh := &peerMembersHandler{ clusterInfo: server.Cluster, } @@ -52,55 +44,6 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { return mux } -type raftHandler struct { - stats etcdserver.Stats - server etcdserver.Server - clusterInfo etcdserver.ClusterInfo -} - -func (h *raftHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "POST") { - return - } - - wcid := h.clusterInfo.ID().String() - w.Header().Set("X-Etcd-Cluster-ID", wcid) - - gcid := r.Header.Get("X-Etcd-Cluster-ID") - if gcid != wcid { - log.Printf("etcdhttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) - http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed) - return - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Println("etcdhttp: error reading raft message:", err) - http.Error(w, "error reading raft message", http.StatusBadRequest) - return - } - var m raftpb.Message - if err := m.Unmarshal(b); err != nil { - log.Println("etcdhttp: error unmarshaling raft message:", err) - http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) - return - } - if err := h.server.Process(context.TODO(), m); err != nil { - switch err { - case etcdserver.ErrRemoved: - log.Printf("etcdhttp: reject message from removed member %s", types.ID(m.From).String()) - http.Error(w, "cannot process message from removed member", http.StatusForbidden) - default: - writeError(w, err) - } - return - } - if m.Type == raftpb.MsgApp { - h.stats.UpdateRecvApp(types.ID(m.From), r.ContentLength) - } - w.WriteHeader(http.StatusNoContent) -} - type peerMembersHandler struct { clusterInfo etcdserver.ClusterInfo } diff --git a/etcdserver/etcdhttp/peer_test.go b/etcdserver/etcdhttp/peer_test.go index 495d9eb4a..29e8b0dc1 100644 --- a/etcdserver/etcdhttp/peer_test.go +++ b/etcdserver/etcdhttp/peer_test.go @@ -17,165 +17,15 @@ package etcdhttp import ( - "bytes" "encoding/json" - "errors" - "io" "net/http" "net/http/httptest" "path" - "strings" "testing" "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/raft/raftpb" ) -func mustMarshalMsg(t *testing.T, m raftpb.Message) []byte { - json, err := m.Marshal() - if err != nil { - t.Fatalf("error marshalling raft Message: %#v", err) - } - return json -} - -// errReader implements io.Reader to facilitate a broken request. -type errReader struct{} - -func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") } - -func TestServeRaft(t *testing.T) { - testCases := []struct { - method string - body io.Reader - serverErr error - clusterID string - - wcode int - }{ - { - // bad method - "GET", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad method - "PUT", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad method - "DELETE", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad request body - "POST", - &errReader{}, - nil, - "0", - http.StatusBadRequest, - }, - { - // bad request protobuf - "POST", - strings.NewReader("malformed garbage"), - nil, - "0", - http.StatusBadRequest, - }, - { - // good request, etcdserver.Server internal error - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - errors.New("some error"), - "0", - http.StatusInternalServerError, - }, - { - // good request from removed member - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - etcdserver.ErrRemoved, - "0", - http.StatusForbidden, - }, - { - // good request - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "1", - http.StatusPreconditionFailed, - }, - { - // good request - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusNoContent, - }, - } - for i, tt := range testCases { - req, err := http.NewRequest(tt.method, "foo", tt.body) - if err != nil { - t.Fatalf("#%d: could not create request: %#v", i, err) - } - req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) - rw := httptest.NewRecorder() - h := &raftHandler{stats: nil, server: &errServer{tt.serverErr}, clusterInfo: &fakeCluster{id: 0}} - h.ServeHTTP(rw, req) - if rw.Code != tt.wcode { - t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) - } - } -} - func TestServeMembersFails(t *testing.T) { tests := []struct { method string diff --git a/etcdserver/sendhub.go b/etcdserver/sendhub.go new file mode 100644 index 000000000..ad1fa18fe --- /dev/null +++ b/etcdserver/sendhub.go @@ -0,0 +1,124 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdserver + +import ( + "log" + "net/http" + "net/url" + "path" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" +) + +const ( + raftPrefix = "/raft" +) + +type sendHub struct { + tr http.RoundTripper + cl ClusterInfo + ss *stats.ServerStats + ls *stats.LeaderStats + senders map[types.ID]rafthttp.Sender + shouldstop chan struct{} +} + +// newSendHub creates the default send hub used to transport raft messages +// to other members. The returned sendHub will update the given ServerStats and +// LeaderStats appropriately. +func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub { + h := &sendHub{ + tr: t, + cl: cl, + ss: ss, + ls: ls, + senders: make(map[types.ID]rafthttp.Sender), + shouldstop: make(chan struct{}, 1), + } + for _, m := range cl.Members() { + h.Add(m) + } + return h +} + +func (h *sendHub) Send(msgs []raftpb.Message) { + for _, m := range msgs { + to := types.ID(m.To) + s, ok := h.senders[to] + if !ok { + if !h.cl.IsIDRemoved(to) { + log.Printf("etcdserver: send message to unknown receiver %s", to) + } + continue + } + + if m.Type == raftpb.MsgApp { + h.ss.SendAppendReq(m.Size()) + } + + s.Send(m) + } +} + +func (h *sendHub) Stop() { + for _, s := range h.senders { + s.Stop() + } +} + +func (h *sendHub) ShouldStopNotify() <-chan struct{} { + return h.shouldstop +} + +func (h *sendHub) Add(m *Member) { + if _, ok := h.senders[m.ID]; ok { + return + } + // TODO: considering how to switch between all available peer urls + peerURL := m.PickPeerURL() + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) + fs := h.ls.Follower(m.ID.String()) + s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), fs, h.shouldstop) + h.senders[m.ID] = s +} + +func (h *sendHub) Remove(id types.ID) { + h.senders[id].Stop() + delete(h.senders, id) +} + +func (h *sendHub) Update(m *Member) { + // TODO: return error or just panic? + if _, ok := h.senders[m.ID]; !ok { + return + } + peerURL := m.PickPeerURL() + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) + h.senders[m.ID].Update(u.String()) +} diff --git a/etcdserver/sendhub_test.go b/etcdserver/sendhub_test.go new file mode 100644 index 000000000..c5bbdb924 --- /dev/null +++ b/etcdserver/sendhub_test.go @@ -0,0 +1,127 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdserver + +import ( + "net/http" + "testing" + "time" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" +) + +func TestSendHubInitSenders(t *testing.T) { + membs := []*Member{ + newTestMember(1, []string{"http://a"}, "", nil), + newTestMember(2, []string{"http://b"}, "", nil), + newTestMember(3, []string{"http://c"}, "", nil), + } + cl := newTestCluster(membs) + ls := stats.NewLeaderStats("") + h := newSendHub(nil, cl, nil, ls) + + ids := cl.MemberIDs() + if len(h.senders) != len(ids) { + t.Errorf("len(ids) = %d, want %d", len(h.senders), len(ids)) + } + for _, id := range ids { + if _, ok := h.senders[id]; !ok { + t.Errorf("senders[%s] is nil, want exists", id) + } + } +} + +func TestSendHubAdd(t *testing.T) { + cl := newTestCluster(nil) + ls := stats.NewLeaderStats("") + h := newSendHub(nil, cl, nil, ls) + m := newTestMember(1, []string{"http://a"}, "", nil) + h.Add(m) + + if _, ok := ls.Followers["1"]; !ok { + t.Errorf("FollowerStats[1] is nil, want exists") + } + s, ok := h.senders[types.ID(1)] + if !ok { + t.Fatalf("senders[1] is nil, want exists") + } + + h.Add(m) + ns := h.senders[types.ID(1)] + if s != ns { + t.Errorf("sender = %p, want %p", ns, s) + } +} + +func TestSendHubRemove(t *testing.T) { + membs := []*Member{ + newTestMember(1, []string{"http://a"}, "", nil), + } + cl := newTestCluster(membs) + ls := stats.NewLeaderStats("") + h := newSendHub(nil, cl, nil, ls) + h.Remove(types.ID(1)) + + if _, ok := h.senders[types.ID(1)]; ok { + t.Fatalf("senders[1] exists, want removed") + } +} + +func TestSendHubShouldStop(t *testing.T) { + membs := []*Member{ + newTestMember(1, []string{"http://a"}, "", nil), + } + tr := newRespRoundTripper(http.StatusForbidden, nil) + cl := newTestCluster(membs) + ls := stats.NewLeaderStats("") + h := newSendHub(tr, cl, nil, ls) + + shouldstop := h.ShouldStopNotify() + select { + case <-shouldstop: + t.Fatalf("received unexpected shouldstop notification") + case <-time.After(10 * time.Millisecond): + } + h.senders[1].Send(raftpb.Message{}) + + testutil.ForceGosched() + select { + case <-shouldstop: + default: + t.Fatalf("cannot receive stop notification") + } +} + +type respRoundTripper struct { + code int + err error +} + +func newRespRoundTripper(code int, err error) *respRoundTripper { + return &respRoundTripper{code: code, err: err} +} +func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err +} + +type nopReadCloser struct{} + +func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil } +func (n *nopReadCloser) Close() error { return nil } diff --git a/etcdserver/server.go b/etcdserver/server.go index 8067a929f..c86970c3f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -33,6 +33,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/discovery" + "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/pbutil" @@ -61,7 +62,6 @@ const ( var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrStopped = errors.New("etcdserver: server stopped") - ErrRemoved = errors.New("etcdserver: server removed") ErrIDRemoved = errors.New("etcdserver: ID removed") ErrIDExists = errors.New("etcdserver: ID exists") ErrIDNotFound = errors.New("etcdserver: ID not found") @@ -145,8 +145,6 @@ type Stats interface { LeaderStats() []byte // StoreStats returns statistics of the store backing this EtcdServer StoreStats() []byte - // UpdateRecvApp updates the underlying statistics in response to a receiving an Append request - UpdateRecvApp(from types.ID, length int64) } type RaftTimer interface { @@ -323,7 +321,11 @@ func (s *EtcdServer) ID() types.ID { return s.id } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.Cluster.IsIDRemoved(types.ID(m.From)) { - return ErrRemoved + log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String()) + return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") + } + if m.Type == raftpb.MsgApp { + s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size()) } return s.node.Step(ctx, m) } @@ -493,10 +495,6 @@ func (s *EtcdServer) LeaderStats() []byte { func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } -func (s *EtcdServer) UpdateRecvApp(from types.ID, length int64) { - s.stats.RecvAppendReq(from.String(), int(length)) -} - func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error { // TODO: move Member to protobuf type b, err := json.Marshal(memb) diff --git a/raft/log.go b/raft/log.go index 17d17a252..5666ed1c5 100644 --- a/raft/log.go +++ b/raft/log.go @@ -86,11 +86,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry default: l.append(ci-1, ents[ci-from:]...) } - tocommit := min(committed, lastnewi) - // if toCommit > commitIndex, set commitIndex = toCommit - if l.committed < tocommit { - l.committed = tocommit - } + l.commitTo(min(committed, lastnewi)) return lastnewi, true } return 0, false @@ -171,6 +167,16 @@ func (l *raftLog) lastIndex() uint64 { return index } +func (l *raftLog) commitTo(tocommit uint64) { + // never decrease commit + if l.committed < tocommit { + if l.lastIndex() < tocommit { + panic("committed out of range") + } + l.committed = tocommit + } +} + func (l *raftLog) appliedTo(i uint64) { if i == 0 { return @@ -235,7 +241,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool { func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { if maxIndex > l.committed && l.term(maxIndex) == term { - l.committed = maxIndex + l.commitTo(maxIndex) return true } return false diff --git a/raft/log_test.go b/raft/log_test.go index ee3f387f0..8ea5cb8b8 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -400,6 +400,38 @@ func TestUnstableEnts(t *testing.T) { } } +func TestCommitTo(t *testing.T) { + previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}} + commit := uint64(2) + tests := []struct { + commit uint64 + wcommit uint64 + wpanic bool + }{ + {3, 3, false}, + {1, 2, false}, // never decrease + {4, 0, true}, // commit out of range -> panic + } + for i, tt := range tests { + func() { + defer func() { + if r := recover(); r != nil { + if tt.wpanic != true { + t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic) + } + } + }() + raftLog := newLog(NewMemoryStorage()) + raftLog.append(0, previousEnts...) + raftLog.committed = commit + raftLog.commitTo(tt.commit) + if raftLog.committed != tt.wcommit { + t.Errorf("#%d: committed = %d, want %d", i, raftLog.committed, tt.wcommit) + } + }() + } +} + func TestStableTo(t *testing.T) { tests := []struct { stable uint64 diff --git a/raft/raft.go b/raft/raft.go index 4dd9c2a1a..c7d8b1db8 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -63,12 +63,26 @@ func (pr *progress) update(n uint64) { pr.next = n + 1 } +func (pr *progress) optimisticUpdate(n uint64) { + pr.next = n + 1 +} + // maybeDecrTo returns false if the given to index comes from an out of order message. // Otherwise it decreases the progress next index and returns true. func (pr *progress) maybeDecrTo(to uint64) bool { - // the rejection must be stale if the progress has matched with - // follower or "to" does not match next - 1 - if pr.match != 0 || pr.next-1 != to { + if pr.match != 0 { + // the rejection must be stale if the progress has matched and "to" + // is smaller than "match". + if to <= pr.match { + return false + } + // directly decrease next to match + 1 + pr.next = pr.match + 1 + return true + } + + // the rejection must be stale if "to" does not match next - 1 + if pr.next-1 != to { return false } @@ -214,15 +228,28 @@ func (r *raft) sendAppend(to uint64) { m.LogTerm = r.raftLog.term(pr.next - 1) m.Entries = r.raftLog.entries(pr.next) m.Commit = r.raftLog.committed + // optimistically increase the next if the follower + // has been matched. + if n := len(m.Entries); pr.match != 0 && n != 0 { + pr.optimisticUpdate(m.Entries[n-1].Index) + } } r.send(m) } // sendHeartbeat sends an empty MsgApp func (r *raft) sendHeartbeat(to uint64) { + // Attach the commit as min(to.matched, r.committed). + // When the leader sends out heartbeat message, + // the receiver(follower) might not be matched with the leader + // or it might not have all the committed entries. + // The leader MUST NOT forward the follower's commit to + // an unmatched index. + commit := min(r.prs[to].match, r.raftLog.committed) m := pb.Message{ - To: to, - Type: pb.MsgApp, + To: to, + Type: pb.MsgApp, + Commit: commit, } r.send(m) } @@ -397,6 +424,10 @@ func (r *raft) handleAppendEntries(m pb.Message) { } } +func (r *raft) handleHeartbeat(m pb.Message) { + r.raftLog.commitTo(m.Commit) +} + func (r *raft) handleSnapshot(m pb.Message) { if r.restore(m.Snapshot) { r.snapshot = &m.Snapshot @@ -493,7 +524,11 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgApp: r.elapsed = 0 r.lead = m.From - r.handleAppendEntries(m) + if m.LogTerm == 0 && m.Index == 0 && len(m.Entries) == 0 { + r.handleHeartbeat(m) + } else { + r.handleAppendEntries(m) + } case pb.MsgSnap: r.elapsed = 0 r.handleSnapshot(m) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 690e73836..92167386f 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -597,11 +597,10 @@ func TestFollowerCheckMsgApp(t *testing.T) { index uint64 wreject bool }{ - {0, 0, false}, {ents[0].Term, ents[0].Index, false}, - {ents[1].Term, ents[1].Index, false}, {ents[0].Term, ents[0].Index + 1, true}, {ents[0].Term + 1, ents[0].Index, true}, + {ents[1].Term, ents[1].Index, false}, {3, 3, true}, } for i, tt := range tests { diff --git a/raft/raft_test.go b/raft/raft_test.go index 7cf667103..d53ca1d08 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -64,8 +64,18 @@ func TestProgressMaybeDecr(t *testing.T) { 1, 0, 0, false, 0, }, { - // match != 0 is always false - 5, 10, 9, false, 10, + // match != 0 and to is greater than match + // directly decrease to match+1 + 5, 10, 5, false, 10, + }, + { + // match != 0 and to is greater than match + // directly decrease to match+1 + 5, 10, 4, false, 10, + }, + { + // match != 0 and to is not greater than match + 5, 10, 9, true, 6, }, { // next-1 != to is always false @@ -664,6 +674,37 @@ func TestHandleMsgApp(t *testing.T) { } } +// TestHandleHeartbeat ensures that the follower commits to the commit in the message. +func TestHandleHeartbeat(t *testing.T) { + commit := uint64(2) + tests := []struct { + m pb.Message + wCommit uint64 + }{ + {pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1}, + {pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit + } + + for i, tt := range tests { + storage := NewMemoryStorage() + storage.Append([]pb.Entry{{Term: 1}, {Term: 2}, {Term: 3}}) + sm := &raft{ + state: StateFollower, + HardState: pb.HardState{Term: 2}, + raftLog: newLog(storage), + } + sm.raftLog.commitTo(commit) + sm.handleHeartbeat(tt.m) + if sm.raftLog.committed != tt.wCommit { + t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) + } + m := sm.readMessages() + if len(m) != 0 { + t.Fatalf("#%d: msg = nil, want 0", i) + } + } +} + func TestRecvMsgVote(t *testing.T) { tests := []struct { state StateType @@ -836,7 +877,7 @@ func TestAllServerStepdown(t *testing.T) { } func TestLeaderAppResp(t *testing.T) { - // initial progress: match = 0; netx = 3 + // initial progress: match = 0; next = 3 tests := []struct { index uint64 reject bool @@ -850,7 +891,7 @@ func TestLeaderAppResp(t *testing.T) { }{ {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg - {2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index + {2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies } @@ -913,13 +954,20 @@ func TestBcastBeat(t *testing.T) { for i := 0; i < 10; i++ { sm.appendEntry(pb.Entry{}) } + // slow follower + sm.prs[2].match, sm.prs[2].next = 5, 6 + // normal follower + sm.prs[3].match, sm.prs[3].next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() if len(msgs) != 2 { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } - tomap := map[uint64]bool{2: true, 3: true} + wantCommitMap := map[uint64]uint64{ + 2: min(sm.raftLog.committed, sm.prs[2].match), + 3: min(sm.raftLog.committed, sm.prs[3].match), + } for i, m := range msgs { if m.Type != pb.MsgApp { t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp) @@ -930,10 +978,13 @@ func TestBcastBeat(t *testing.T) { if m.LogTerm != 0 { t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0) } - if !tomap[m.To] { + if wantCommitMap[m.To] == 0 { t.Fatalf("#%d: unexpected to %d", i, m.To) } else { - delete(tomap, m.To) + if m.Commit != wantCommitMap[m.To] { + t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To]) + } + delete(wantCommitMap, m.To) } if len(m.Entries) != 0 { t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries)) @@ -980,6 +1031,37 @@ func TestRecvMsgBeat(t *testing.T) { } } +func TestLeaderIncreaseNext(t *testing.T) { + previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} + tests := []struct { + // progress + match uint64 + next uint64 + + wnext uint64 + }{ + // match is not zero, optimistically increase next + // previous entries + noop entry + propose + 1 + {1, 2, uint64(len(previousEnts) + 1 + 1 + 1)}, + // match is zero, not optimistically increase next + {0, 2, 2}, + } + + for i, tt := range tests { + sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + sm.raftLog.append(0, previousEnts...) + sm.becomeCandidate() + sm.becomeLeader() + sm.prs[2].match, sm.prs[2].next = tt.match, tt.next + sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + + p := sm.prs[2] + if p.next != tt.wnext { + t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext) + } + } +} + func TestRestore(t *testing.T) { s := pb.Snapshot{ Metadata: pb.SnapshotMetadata{ diff --git a/rafthttp/http.go b/rafthttp/http.go new file mode 100644 index 000000000..87ff9f924 --- /dev/null +++ b/rafthttp/http.go @@ -0,0 +1,90 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "io/ioutil" + "log" + "net/http" + + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +type Processor interface { + Process(ctx context.Context, m raftpb.Message) error +} + +func NewHandler(p Processor, cid types.ID) http.Handler { + return &handler{ + p: p, + cid: cid, + } +} + +type handler struct { + p Processor + cid types.ID +} + +func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + w.Header().Set("Allow", "POST") + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + wcid := h.cid.String() + w.Header().Set("X-Etcd-Cluster-ID", wcid) + + gcid := r.Header.Get("X-Etcd-Cluster-ID") + if gcid != wcid { + log.Printf("rafthttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) + http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed) + return + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("rafthttp: error reading raft message:", err) + http.Error(w, "error reading raft message", http.StatusBadRequest) + return + } + var m raftpb.Message + if err := m.Unmarshal(b); err != nil { + log.Println("rafthttp: error unmarshaling raft message:", err) + http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) + return + } + if err := h.p.Process(context.TODO(), m); err != nil { + switch v := err.(type) { + case writerToResponse: + v.WriteTo(w) + default: + log.Printf("rafthttp: error processing raft message: %v", err) + http.Error(w, "error processing raft message", http.StatusInternalServerError) + } + return + } + w.WriteHeader(http.StatusNoContent) +} + +type writerToResponse interface { + WriteTo(w http.ResponseWriter) +} diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go new file mode 100644 index 000000000..e884d0118 --- /dev/null +++ b/rafthttp/http_test.go @@ -0,0 +1,180 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "bytes" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +func TestServeRaft(t *testing.T) { + testCases := []struct { + method string + body io.Reader + p Processor + clusterID string + + wcode int + }{ + { + // bad method + "GET", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad method + "PUT", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad method + "DELETE", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad request body + "POST", + &errReader{}, + &nopProcessor{}, + "0", + http.StatusBadRequest, + }, + { + // bad request protobuf + "POST", + strings.NewReader("malformed garbage"), + &nopProcessor{}, + "0", + http.StatusBadRequest, + }, + { + // good request, wrong cluster ID + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "1", + http.StatusPreconditionFailed, + }, + { + // good request, Processor failure + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &errProcessor{ + err: &resWriterToError{code: http.StatusForbidden}, + }, + "0", + http.StatusForbidden, + }, + { + // good request, Processor failure + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &errProcessor{ + err: &resWriterToError{code: http.StatusInternalServerError}, + }, + "0", + http.StatusInternalServerError, + }, + { + // good request, Processor failure + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &errProcessor{err: errors.New("blah")}, + "0", + http.StatusInternalServerError, + }, + { + // good request + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusNoContent, + }, + } + for i, tt := range testCases { + req, err := http.NewRequest(tt.method, "foo", tt.body) + if err != nil { + t.Fatalf("#%d: could not create request: %#v", i, err) + } + req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) + rw := httptest.NewRecorder() + h := NewHandler(tt.p, types.ID(0)) + h.ServeHTTP(rw, req) + if rw.Code != tt.wcode { + t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) + } + } +} + +// errReader implements io.Reader to facilitate a broken request. +type errReader struct{} + +func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") } + +type nopProcessor struct{} + +func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil } + +type errProcessor struct { + err error +} + +func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err } + +type resWriterToError struct { + code int +} + +func (e *resWriterToError) Error() string { return "" } +func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) } diff --git a/etcdserver/sender.go b/rafthttp/sender.go similarity index 56% rename from etcdserver/sender.go rename to rafthttp/sender.go index 875c48bde..36d8ceb92 100644 --- a/etcdserver/sender.go +++ b/rafthttp/sender.go @@ -14,138 +14,38 @@ limitations under the License. */ -package etcdserver +package rafthttp import ( "bytes" "fmt" "log" "net/http" - "net/url" - "path" "sync" "time" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" ) const ( - raftPrefix = "/raft" connPerSender = 4 senderBufSize = connPerSender * 4 ) -type sendHub struct { - tr http.RoundTripper - cl ClusterInfo - ss *stats.ServerStats - ls *stats.LeaderStats - senders map[types.ID]*sender - shouldstop chan struct{} +type Sender interface { + Update(u string) + // Send sends the data to the remote node. It is always non-blocking. + // It may be fail to send data if it returns nil error. + Send(m raftpb.Message) error + // Stop performs any necessary finalization and terminates the Sender + // elegantly. + Stop() } -// newSendHub creates the default send hub used to transport raft messages -// to other members. The returned sendHub will update the given ServerStats and -// LeaderStats appropriately. -func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub { - h := &sendHub{ - tr: t, - cl: cl, - ss: ss, - ls: ls, - senders: make(map[types.ID]*sender), - shouldstop: make(chan struct{}, 1), - } - for _, m := range cl.Members() { - h.Add(m) - } - return h -} - -func (h *sendHub) Send(msgs []raftpb.Message) { - for _, m := range msgs { - to := types.ID(m.To) - s, ok := h.senders[to] - if !ok { - if !h.cl.IsIDRemoved(to) { - log.Printf("etcdserver: send message to unknown receiver %s", to) - } - continue - } - - // TODO: don't block. we should be able to have 1000s - // of messages out at a time. - data, err := m.Marshal() - if err != nil { - log.Println("sender: dropping message:", err) - return // drop bad message - } - if m.Type == raftpb.MsgApp { - h.ss.SendAppendReq(len(data)) - } - - // TODO (xiangli): reasonable retry logic - s.send(data) - } -} - -func (h *sendHub) Stop() { - for _, s := range h.senders { - s.stop() - } -} - -func (h *sendHub) ShouldStopNotify() <-chan struct{} { - return h.shouldstop -} - -func (h *sendHub) Add(m *Member) { - if _, ok := h.senders[m.ID]; ok { - return - } - // TODO: considering how to switch between all available peer urls - u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix) - fs := h.ls.Follower(m.ID.String()) - s := newSender(h.tr, u, h.cl.ID(), fs, h.shouldstop) - h.senders[m.ID] = s -} - -func (h *sendHub) Remove(id types.ID) { - h.senders[id].stop() - delete(h.senders, id) -} - -func (h *sendHub) Update(m *Member) { - // TODO: return error or just panic? - if _, ok := h.senders[m.ID]; !ok { - return - } - peerURL := m.PickPeerURL() - u, err := url.Parse(peerURL) - if err != nil { - log.Panicf("unexpect peer url %s", peerURL) - } - u.Path = path.Join(u.Path, raftPrefix) - s := h.senders[m.ID] - s.mu.Lock() - defer s.mu.Unlock() - s.u = u.String() -} - -type sender struct { - tr http.RoundTripper - u string - cid types.ID - fs *stats.FollowerStats - q chan []byte - mu sync.RWMutex - wg sync.WaitGroup - shouldstop chan struct{} -} - -func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { +func NewSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { s := &sender{ tr: tr, u: u, @@ -161,7 +61,28 @@ func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerS return s } -func (s *sender) send(data []byte) error { +type sender struct { + tr http.RoundTripper + u string + cid types.ID + fs *stats.FollowerStats + q chan []byte + mu sync.RWMutex + wg sync.WaitGroup + shouldstop chan struct{} +} + +func (s *sender) Update(u string) { + s.mu.Lock() + defer s.mu.Unlock() + s.u = u +} + +// TODO (xiangli): reasonable retry logic +func (s *sender) Send(m raftpb.Message) error { + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + data := pbutil.MustMarshal(&m) select { case s.q <- data: return nil @@ -171,7 +92,7 @@ func (s *sender) send(data []byte) error { } } -func (s *sender) stop() { +func (s *sender) Stop() { close(s.q) s.wg.Wait() } diff --git a/etcdserver/sender_test.go b/rafthttp/sender_test.go similarity index 65% rename from etcdserver/sender_test.go rename to rafthttp/sender_test.go index e24637093..a908d3838 100644 --- a/etcdserver/sender_test.go +++ b/rafthttp/sender_test.go @@ -14,7 +14,7 @@ limitations under the License. */ -package etcdserver +package rafthttp import ( "errors" @@ -22,109 +22,24 @@ import ( "net/http" "sync" "testing" - "time" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" ) -func TestSendHubInitSenders(t *testing.T) { - membs := []*Member{ - newTestMember(1, []string{"http://a"}, "", nil), - newTestMember(2, []string{"http://b"}, "", nil), - newTestMember(3, []string{"http://c"}, "", nil), - } - cl := newTestCluster(membs) - ls := stats.NewLeaderStats("") - h := newSendHub(nil, cl, nil, ls) - - ids := cl.MemberIDs() - if len(h.senders) != len(ids) { - t.Errorf("len(ids) = %d, want %d", len(h.senders), len(ids)) - } - for _, id := range ids { - if _, ok := h.senders[id]; !ok { - t.Errorf("senders[%s] is nil, want exists", id) - } - } -} - -func TestSendHubAdd(t *testing.T) { - cl := newTestCluster(nil) - ls := stats.NewLeaderStats("") - h := newSendHub(nil, cl, nil, ls) - m := newTestMember(1, []string{"http://a"}, "", nil) - h.Add(m) - - if _, ok := ls.Followers["1"]; !ok { - t.Errorf("FollowerStats[1] is nil, want exists") - } - s, ok := h.senders[types.ID(1)] - if !ok { - t.Fatalf("senders[1] is nil, want exists") - } - if s.u != "http://a/raft" { - t.Errorf("url = %s, want %s", s.u, "http://a/raft") - } - - h.Add(m) - ns := h.senders[types.ID(1)] - if s != ns { - t.Errorf("sender = %p, want %p", ns, s) - } -} - -func TestSendHubRemove(t *testing.T) { - membs := []*Member{ - newTestMember(1, []string{"http://a"}, "", nil), - } - cl := newTestCluster(membs) - ls := stats.NewLeaderStats("") - h := newSendHub(nil, cl, nil, ls) - h.Remove(types.ID(1)) - - if _, ok := h.senders[types.ID(1)]; ok { - t.Fatalf("senders[1] exists, want removed") - } -} - -func TestSendHubShouldStop(t *testing.T) { - membs := []*Member{ - newTestMember(1, []string{"http://a"}, "", nil), - } - tr := newRespRoundTripper(http.StatusForbidden, nil) - cl := newTestCluster(membs) - ls := stats.NewLeaderStats("") - h := newSendHub(tr, cl, nil, ls) - - shouldstop := h.ShouldStopNotify() - select { - case <-shouldstop: - t.Fatalf("received unexpected shouldstop notification") - case <-time.After(10 * time.Millisecond): - } - h.senders[1].send([]byte("somedata")) - - testutil.ForceGosched() - select { - case <-shouldstop: - default: - t.Fatalf("cannot receive stop notification") - } -} - // TestSenderSend tests that send func could post data using roundtripper // and increase success count in stats. func TestSenderSend(t *testing.T) { tr := &roundTripperRecorder{} fs := &stats.FollowerStats{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) + s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) - if err := s.send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Fatalf("unexpect send error: %v", err) } - s.stop() + s.Stop() if tr.Request() == nil { t.Errorf("sender fails to post the data") @@ -139,12 +54,12 @@ func TestSenderSend(t *testing.T) { func TestSenderExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() fs := &stats.FollowerStats{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) + s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) // keep the sender busy and make the buffer full // nothing can go out as we block the sender for i := 0; i < connPerSender+senderBufSize; i++ { - if err := s.send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } // force the sender to grab data @@ -152,7 +67,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { } // try to send a data when we are sure the buffer is full - if err := s.send([]byte("some data")); err == nil { + if err := s.Send(raftpb.Message{}); err == nil { t.Errorf("unexpect send success") } @@ -161,22 +76,22 @@ func TestSenderExceedMaximalServing(t *testing.T) { testutil.ForceGosched() // It could send new data after previous ones succeed - if err := s.send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } - s.stop() + s.Stop() } // TestSenderSendFailed tests that when send func meets the post error, // it increases fail count in stats. func TestSenderSendFailed(t *testing.T) { fs := &stats.FollowerStats{} - s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) + s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) - if err := s.send([]byte("some data")); err != nil { - t.Fatalf("unexpect send error: %v", err) + if err := s.Send(raftpb.Message{}); err != nil { + t.Fatalf("unexpect Send error: %v", err) } - s.stop() + s.Stop() fs.Lock() defer fs.Unlock() @@ -187,11 +102,11 @@ func TestSenderSendFailed(t *testing.T) { func TestSenderPost(t *testing.T) { tr := &roundTripperRecorder{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), nil, nil) + s := NewSender(tr, "http://10.0.0.1", types.ID(1), nil, nil) if err := s.post([]byte("some data")); err != nil { t.Fatalf("unexpect post error: %v", err) } - s.stop() + s.Stop() if g := tr.Request().Method; g != "POST" { t.Errorf("method = %s, want %s", g, "POST") @@ -230,9 +145,9 @@ func TestSenderPostBad(t *testing.T) { } for i, tt := range tests { shouldstop := make(chan struct{}) - s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) + s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) err := s.post([]byte("some data")) - s.stop() + s.Stop() if err == nil { t.Errorf("#%d: err = nil, want not nil", i) @@ -251,9 +166,9 @@ func TestSenderPostShouldStop(t *testing.T) { } for i, tt := range tests { shouldstop := make(chan struct{}, 1) - s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) + s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) s.post([]byte("some data")) - s.stop() + s.Stop() select { case <-shouldstop: default: diff --git a/scripts/build-docker b/scripts/build-docker index 37931f4ac..986032ec1 100755 --- a/scripts/build-docker +++ b/scripts/build-docker @@ -6,7 +6,7 @@ FROM scratch ADD etcd / ADD etcdctl / EXPOSE 4001 7001 2379 2380 -CMD ["/etcd"] +ENTRYPOINT ["/etcd"] DF -docker build . +docker build -t quay.io/coreos/etcd:${1} . diff --git a/scripts/build-release b/scripts/build-release index 740ac0e4c..5e719d24a 100755 --- a/scripts/build-release +++ b/scripts/build-release @@ -35,7 +35,7 @@ function package { if [ -d ${ccdir} ]; then srcdir=${ccdir} fi - for bin in etcd etcdctl; do + for bin in etcd etcdctl etcd-migrate; do cp ${srcdir}/${bin} ${target} done diff --git a/test b/test index d46548fe8..41a366388 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"} source ./build # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt. -TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/types pkg/transport pkg/wait proxy raft snap store wal" +TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal" FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/" # user has not provided PKG override