diff --git a/rafthttp/snapshot_test.go b/rafthttp/snapshot_test.go new file mode 100644 index 000000000..fbf482d09 --- /dev/null +++ b/rafthttp/snapshot_test.go @@ -0,0 +1,145 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" +) + +type strReaderCloser struct{ *strings.Reader } + +func (s strReaderCloser) Close() error { return nil } + +func TestSnapshotSend(t *testing.T) { + tests := []struct { + m raftpb.Message + rc io.ReadCloser + size int64 + + wsent bool + wfiles int + }{ + // sent and receive with no errors + { + m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, + rc: strReaderCloser{strings.NewReader("hello")}, + size: 5, + + wsent: true, + wfiles: 1, + }, + // error when reading snapshot for send + { + m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, + rc: &errReadCloser{fmt.Errorf("snapshot error")}, + size: 1, + + wsent: false, + wfiles: 0, + }, + // sends less than the given snapshot length + { + m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, + rc: strReaderCloser{strings.NewReader("hello")}, + size: 10000, + + wsent: false, + wfiles: 0, + }, + // sends less than actual snapshot length + { + m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, + rc: strReaderCloser{strings.NewReader("hello")}, + size: 1, + + wsent: false, + wfiles: 0, + }, + } + + for i, tt := range tests { + sent, files := testSnapshotSend(t, snap.NewMessage(tt.m, tt.rc, tt.size)) + if tt.wsent != sent { + t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent) + } + if tt.wfiles != len(files) { + t.Fatalf("#%d: expected %d files, got %d files", i, tt.wfiles, len(files)) + } + } +} + +func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) { + d, err := ioutil.TempDir(os.TempDir(), "snapdir") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(d) + + r := &fakeRaft{} + tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r} + ch := make(chan struct{}, 1) + h := &syncHandler{newSnapshotHandler(tr, r, snap.New(d), types.ID(1)), ch} + srv := httptest.NewServer(h) + defer srv.Close() + + picker := mustNewURLPicker(t, []string{srv.URL}) + snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(types.ID(1))) + defer snapsend.stop() + + snapsend.send(*sm) + + sent := false + select { + case <-time.After(time.Second): + t.Fatalf("timed out sending snapshot") + case sent = <-sm.CloseNotify(): + } + + // wait for handler to finish accepting snapshot + <-ch + + files, rerr := ioutil.ReadDir(d) + if rerr != nil { + t.Fatal(rerr) + } + return sent, files +} + +type errReadCloser struct{ err error } + +func (s *errReadCloser) Read(p []byte) (int, error) { return 0, s.err } +func (s *errReadCloser) Close() error { return s.err } + +type syncHandler struct { + h http.Handler + ch chan<- struct{} +} + +func (sh *syncHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + sh.h.ServeHTTP(w, r) + sh.ch <- struct{}{} +}