// Copyright 2016 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rafthttp import ( "fmt" "io" "io/ioutil" "net/http" "net/http/httptest" "os" "strings" "testing" "time" "github.com/coreos/etcd/internal/raftsnap" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" ) type strReaderCloser struct{ *strings.Reader } func (s strReaderCloser) Close() error { return nil } func TestSnapshotSend(t *testing.T) { tests := []struct { m raftpb.Message rc io.ReadCloser size int64 wsent bool wfiles int }{ // sent and receive with no errors { m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, rc: strReaderCloser{strings.NewReader("hello")}, size: 5, wsent: true, wfiles: 1, }, // error when reading snapshot for send { m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, rc: &errReadCloser{fmt.Errorf("snapshot error")}, size: 1, wsent: false, wfiles: 0, }, // sends less than the given snapshot length { m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, rc: strReaderCloser{strings.NewReader("hello")}, size: 10000, wsent: false, wfiles: 0, }, // sends less than actual snapshot length { m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, rc: strReaderCloser{strings.NewReader("hello")}, size: 1, wsent: false, wfiles: 0, }, } for i, tt := range tests { sent, files := testSnapshotSend(t, raftsnap.NewMessage(tt.m, tt.rc, tt.size)) if tt.wsent != sent { t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent) } if tt.wfiles != len(files) { t.Fatalf("#%d: expected %d files, got %d files", i, tt.wfiles, len(files)) } } } func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo) { d, err := ioutil.TempDir(os.TempDir(), "snapdir") if err != nil { t.Fatal(err) } defer os.RemoveAll(d) r := &fakeRaft{} tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r} ch := make(chan struct{}, 1) h := &syncHandler{newSnapshotHandler(tr, r, raftsnap.New(d), types.ID(1)), ch} srv := httptest.NewServer(h) defer srv.Close() picker := mustNewURLPicker(t, []string{srv.URL}) snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(types.ID(1))) defer snapsend.stop() snapsend.send(*sm) sent := false select { case <-time.After(time.Second): t.Fatalf("timed out sending snapshot") case sent = <-sm.CloseNotify(): } // wait for handler to finish accepting snapshot <-ch files, rerr := ioutil.ReadDir(d) if rerr != nil { t.Fatal(rerr) } return sent, files } type errReadCloser struct{ err error } func (s *errReadCloser) Read(p []byte) (int, error) { return 0, s.err } func (s *errReadCloser) Close() error { return s.err } type syncHandler struct { h http.Handler ch chan<- struct{} } func (sh *syncHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { sh.h.ServeHTTP(w, r) sh.ch <- struct{}{} }