Merge pull request #9211 from gyuho/raftsnap

*: rename "snap" to "raftsnap"
release-3.4
Gyuho Lee 2018-01-24 11:19:40 -08:00 committed by GitHub
commit 7331949a47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 95 additions and 78 deletions

View File

@ -19,6 +19,7 @@
- Move `cmd/vendor` directory to `vendor` at repository root.
- Remove recursive symlinks in `cmd` directory.
- Now `go get/install/build` on `etcd` packages (e.g. `clientv3`, `tools/benchmark`) enforce builds with etcd `vendor` directory.
- Rename `"github.com/coreos/etcd/snap"` to [`"github.com/coreos/etcd/internal/raftsnap"`](https://github.com/coreos/etcd/pull/9211).
### Added(API)

View File

@ -21,7 +21,7 @@ import (
"log"
"sync"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/internal/raftsnap"
)
// a key-value store backed by raft
@ -29,7 +29,7 @@ type kvstore struct {
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current committed key-value pairs
snapshotter *snap.Snapshotter
snapshotter *raftsnap.Snapshotter
}
type kv struct {
@ -37,7 +37,7 @@ type kv struct {
Val string
}
func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {
func newKVStore(snapshotter *raftsnap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {
s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
// replay log into key-value map
s.readCommits(commitC, errorC)
@ -67,10 +67,10 @@ func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
// done replaying log; new data incoming
// OR signaled to load snapshot
snapshot, err := s.snapshotter.Load()
if err == snap.ErrNoSnapshot {
if err == raftsnap.ErrNoSnapshot {
return
}
if err != nil && err != snap.ErrNoSnapshot {
if err != nil && err != raftsnap.ErrNoSnapshot {
log.Panic(err)
}
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)

View File

@ -25,12 +25,12 @@ import (
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
@ -59,8 +59,8 @@ type raftNode struct {
raftStorage *raft.MemoryStorage
wal *wal.WAL
snapshotter *snap.Snapshotter
snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
snapshotter *raftsnap.Snapshotter
snapshotterReady chan *raftsnap.Snapshotter // signals when snapshotter is ready
snapCount uint64
transport *rafthttp.Transport
@ -77,7 +77,7 @@ var defaultSnapCount uint64 = 10000
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *raftsnap.Snapshotter) {
commitC := make(chan *string)
errorC := make(chan error)
@ -98,7 +98,7 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
snapshotterReady: make(chan *snap.Snapshotter, 1),
snapshotterReady: make(chan *raftsnap.Snapshotter, 1),
// rest of structure populated after WAL replay
}
go rc.startRaft()
@ -188,7 +188,7 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
func (rc *raftNode) loadSnapshot() *raftpb.Snapshot {
snapshot, err := rc.snapshotter.Load()
if err != nil && err != snap.ErrNoSnapshot {
if err != nil && err != raftsnap.ErrNoSnapshot {
log.Fatalf("raftexample: error loading snapshot (%v)", err)
}
return snapshot
@ -261,7 +261,7 @@ func (rc *raftNode) startRaft() {
log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
}
}
rc.snapshotter = snap.New(rc.snapdir)
rc.snapshotter = raftsnap.New(rc.snapdir)
rc.snapshotterReady <- rc.snapshotter
oldwal := wal.Exist(rc.waldir)

View File

@ -25,11 +25,11 @@ import (
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
@ -102,14 +102,14 @@ func handleBackup(c *cli.Context) error {
}
func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) {
ss := snap.New(srcSnap)
ss := raftsnap.New(srcSnap)
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
if err != nil && err != raftsnap.ErrNoSnapshot {
log.Fatal(err)
}
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
newss := snap.New(destSnap)
newss := raftsnap.New(destSnap)
if err = newss.SaveSnap(*snapshot); err != nil {
log.Fatal(err)
}

View File

@ -30,13 +30,13 @@ import (
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
@ -134,9 +134,9 @@ func rebuildStoreV2() (store.Store, uint64) {
}
snapdir := filepath.Join(migrateDatadir, "member", "snap")
ss := snap.New(snapdir)
ss := raftsnap.New(snapdir)
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
if err != nil && err != raftsnap.ErrNoSnapshot {
ExitWithError(ExitError, err)
}

View File

@ -19,11 +19,11 @@ import (
"os"
"time"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
)
func newBackend(cfg ServerConfig) backend.Backend {
@ -37,7 +37,7 @@ func newBackend(cfg ServerConfig) backend.Backend {
}
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func openSnapshotBackend(cfg ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
func openSnapshotBackend(cfg ServerConfig, ss *raftsnap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
if err != nil {
return nil, fmt.Errorf("database snapshot file path error: %v", err)
@ -77,5 +77,5 @@ func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot ra
return oldbe, nil
}
oldbe.Close()
return openSnapshotBackend(cfg, snap.New(cfg.SnapDir()), snapshot)
return openSnapshotBackend(cfg, raftsnap.New(cfg.SnapDir()), snapshot)
}

View File

@ -38,6 +38,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/mvcc"
@ -52,7 +53,6 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/version"
"github.com/coreos/etcd/wal"
@ -206,7 +206,7 @@ type EtcdServer struct {
cluster *membership.RaftCluster
store store.Store
snapshotter *snap.Snapshotter
snapshotter *raftsnap.Snapshotter
applyV2 ApplierV2
@ -279,7 +279,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil {
plog.Fatalf("create snapshot directory error: %v", err)
}
ss := snap.New(cfg.SnapDir())
ss := raftsnap.New(cfg.SnapDir())
bepath := cfg.backendPath()
beExist := fileutil.Exist(bepath)
@ -373,7 +373,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
snapshot, err = ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
if err != nil && err != raftsnap.ErrNoSnapshot {
return nil, err
}
if snapshot != nil {
@ -1266,7 +1266,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
}
}
func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
func (s *EtcdServer) sendMergedSnap(merged raftsnap.Message) {
atomic.AddInt64(&s.inflightSnapshots, 1)
s.r.transport.SendSnapshot(merged)

View File

@ -28,6 +28,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
@ -43,7 +44,6 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
)
@ -986,7 +986,7 @@ func TestSnapshotOrdering(t *testing.T) {
Cfg: ServerConfig{DataDir: testdir},
r: *r,
store: st,
snapshotter: snap.New(snapdir),
snapshotter: raftsnap.New(snapdir),
cluster: cl,
SyncTicker: &time.Ticker{},
}
@ -1111,7 +1111,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
Cfg: ServerConfig{DataDir: testdir},
r: *r,
store: st,
snapshotter: snap.New(testdir),
snapshotter: raftsnap.New(testdir),
cluster: cl,
SyncTicker: &time.Ticker{},
}

View File

@ -17,15 +17,15 @@ package etcdserver
import (
"io"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
)
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
// as ReadCloser.
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) raftsnap.Message {
// get a snapshot of v2 store as []byte
clone := s.store.Clone()
d, err := clone.SaveNoCopy()
@ -51,7 +51,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi
}
m.Snapshot = snapshot
return *snap.NewMessage(m, rc, dbsnap.Size())
return *raftsnap.NewMessage(m, rc, dbsnap.Size())
}
func newSnapshotReaderCloser(snapshot backend.Snapshot) io.ReadCloser {

View File

@ -18,10 +18,10 @@ import (
"io"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
@ -38,10 +38,10 @@ type Storage interface {
type storage struct {
*wal.WAL
*snap.Snapshotter
*raftsnap.Snapshotter
}
func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
func NewStorage(w *wal.WAL, s *raftsnap.Snapshotter) Storage {
return &storage{w, s}
}

View File

@ -20,10 +20,10 @@ import (
"time"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
)
func TestLongestConnected(t *testing.T) {
@ -76,7 +76,7 @@ func newNopTransporterWithActiveTime(memberIDs []types.ID) rafthttp.Transporter
func (s *nopTransporterWithActiveTime) Start() error { return nil }
func (s *nopTransporterWithActiveTime) Handler() http.Handler { return nil }
func (s *nopTransporterWithActiveTime) Send(m []raftpb.Message) {}
func (s *nopTransporterWithActiveTime) SendSnapshot(m snap.Message) {}
func (s *nopTransporterWithActiveTime) SendSnapshot(m raftsnap.Message) {}
func (s *nopTransporterWithActiveTime) AddRemote(id types.ID, us []string) {}
func (s *nopTransporterWithActiveTime) AddPeer(id types.ID, us []string) {}
func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package snap
package raftsnap
import (
"errors"

17
internal/raftsnap/doc.go Normal file
View File

@ -0,0 +1,17 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package raftsnap handles Raft nodes' states with snapshots.
// The snapshot logic is internal to etcd server and raft package.
package raftsnap

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package snap
package raftsnap
import (
"io"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package snap
package raftsnap
import "github.com/prometheus/client_golang/prometheus"

View File

@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Package snap stores raft nodes' states with snapshots.
package snap
package raftsnap
import (
"errors"
@ -26,11 +25,11 @@ import (
"strings"
"time"
"github.com/coreos/etcd/internal/raftsnap/snappb"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap/snappb"
"github.com/coreos/pkg/capnslog"
)

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package snap
package raftsnap
import (
"fmt"

View File

@ -23,10 +23,10 @@ import (
"path"
"strings"
"github.com/coreos/etcd/internal/raftsnap"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/version"
)
@ -136,11 +136,11 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type snapshotHandler struct {
tr Transporter
r Raft
snapshotter *snap.Snapshotter
snapshotter *raftsnap.Snapshotter
cid types.ID
}
func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
func newSnapshotHandler(tr Transporter, r Raft, snapshotter *raftsnap.Snapshotter, cid types.ID) http.Handler {
return &snapshotHandler{
tr: tr,
r: r,

View File

@ -26,10 +26,10 @@ import (
"testing"
"time"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/version"
)
@ -356,7 +356,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
type fakePeer struct {
msgs []raftpb.Message
snapMsgs []snap.Message
snapMsgs []raftsnap.Message
peerURLs types.URLs
connc chan *outgoingConn
paused bool
@ -377,7 +377,7 @@ func (pr *fakePeer) send(m raftpb.Message) {
pr.msgs = append(pr.msgs, m)
}
func (pr *fakePeer) sendSnap(m snap.Message) {
func (pr *fakePeer) sendSnap(m raftsnap.Message) {
if pr.paused {
return
}

View File

@ -20,10 +20,10 @@ import (
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"golang.org/x/time/rate"
)
@ -63,7 +63,7 @@ type Peer interface {
// sendSnap sends the merged snapshot message to the remote peer. Its behavior
// is similar to send.
sendSnap(m snap.Message)
sendSnap(m raftsnap.Message)
// update updates the urls of remote peer.
update(urls types.URLs)
@ -233,7 +233,7 @@ func (p *peer) send(m raftpb.Message) {
}
}
func (p *peer) sendSnap(m snap.Message) {
func (p *peer) sendSnap(m raftsnap.Message) {
go p.snapSender.send(m)
}

View File

@ -22,11 +22,11 @@ import (
"net/http"
"time"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/httputil"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/snap"
)
var (
@ -63,7 +63,7 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
func (s *snapshotSender) stop() { close(s.stopc) }
func (s *snapshotSender) send(merged snap.Message) {
func (s *snapshotSender) send(merged raftsnap.Message) {
m := merged.Message
body := createSnapBody(merged)
@ -142,7 +142,7 @@ func (s *snapshotSender) post(req *http.Request) (err error) {
}
}
func createSnapBody(merged snap.Message) io.ReadCloser {
func createSnapBody(merged raftsnap.Message) io.ReadCloser {
buf := new(bytes.Buffer)
enc := &messageEncoder{w: buf}
// encode raft message

View File

@ -25,9 +25,9 @@ import (
"testing"
"time"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
)
type strReaderCloser struct{ *strings.Reader }
@ -82,7 +82,7 @@ func TestSnapshotSend(t *testing.T) {
}
for i, tt := range tests {
sent, files := testSnapshotSend(t, snap.NewMessage(tt.m, tt.rc, tt.size))
sent, files := testSnapshotSend(t, raftsnap.NewMessage(tt.m, tt.rc, tt.size))
if tt.wsent != sent {
t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent)
}
@ -92,7 +92,7 @@ func TestSnapshotSend(t *testing.T) {
}
}
func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) {
func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo) {
d, err := ioutil.TempDir(os.TempDir(), "snapdir")
if err != nil {
t.Fatal(err)
@ -102,7 +102,7 @@ func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) {
r := &fakeRaft{}
tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r}
ch := make(chan struct{}, 1)
h := &syncHandler{newSnapshotHandler(tr, r, snap.New(d), types.ID(1)), ch}
h := &syncHandler{newSnapshotHandler(tr, r, raftsnap.New(d), types.ID(1)), ch}
srv := httptest.NewServer(h)
defer srv.Close()

View File

@ -21,12 +21,12 @@ import (
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/logutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/pkg/capnslog"
"github.com/xiang90/probing"
@ -60,7 +60,7 @@ type Transporter interface {
Send(m []raftpb.Message)
// SendSnapshot sends out the given snapshot message to a remote peer.
// The behavior of SendSnapshot is similar to Send.
SendSnapshot(m snap.Message)
SendSnapshot(m raftsnap.Message)
// AddRemote adds a remote with given peer urls into the transport.
// A remote helps newly joined member to catch up the progress of cluster,
// and will not be used after that.
@ -107,7 +107,7 @@ type Transport struct {
URLs types.URLs // local peer URLs
ClusterID types.ID // raft cluster ID for request validation
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
Snapshotter *snap.Snapshotter
Snapshotter *raftsnap.Snapshotter
ServerStats *stats.ServerStats // used to record general transportation statistics
// used to record transportation statistics with followers when
// performing as leader in raft protocol
@ -346,7 +346,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
return time.Time{}
}
func (t *Transport) SendSnapshot(m snap.Message) {
func (t *Transport) SendSnapshot(m raftsnap.Message) {
t.mu.Lock()
defer t.mu.Unlock()
p := t.peers[types.ID(m.To)]
@ -384,7 +384,7 @@ func NewNopTransporter() Transporter {
func (s *nopTransporter) Start() error { return nil }
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) SendSnapshot(m snap.Message) {}
func (s *nopTransporter) SendSnapshot(m raftsnap.Message) {}
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
@ -397,18 +397,18 @@ func (s *nopTransporter) Resume() {}
type snapTransporter struct {
nopTransporter
snapDoneC chan snap.Message
snapDoneC chan raftsnap.Message
snapDir string
}
func NewSnapTransporter(snapDir string) (Transporter, <-chan snap.Message) {
ch := make(chan snap.Message, 1)
func NewSnapTransporter(snapDir string) (Transporter, <-chan raftsnap.Message) {
ch := make(chan raftsnap.Message, 1)
tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
return tr, ch
}
func (s *snapTransporter) SendSnapshot(m snap.Message) {
ss := snap.New(s.snapDir)
func (s *snapTransporter) SendSnapshot(m raftsnap.Message) {
ss := raftsnap.New(s.snapDir)
ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
m.CloseWithError(nil)
s.snapDoneC <- m

View File

@ -16,7 +16,7 @@ if [[ $(protoc --version | cut -f2 -d' ') != "3.5.1" ]]; then
fi
# directories containing protos to be built
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb ./etcdserver/api/v3lock/v3lockpb ./etcdserver/api/v3election/v3electionpb"
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./raftsnap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb ./etcdserver/api/v3lock/v3lockpb ./etcdserver/api/v3election/v3electionpb"
# exact version of packages to build
GOGO_PROTO_SHA="41168f6614b7bb144818ec8967b8c702705df564"

View File

@ -30,6 +30,7 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
@ -38,7 +39,6 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
@ -420,7 +420,7 @@ func (s *v3Manager) saveWALAndSnap() error {
},
},
}
sn := snap.New(s.snapDir)
sn := raftsnap.New(s.snapDir)
if err := sn.SaveSnap(raftSnap); err != nil {
return err
}

View File

@ -22,10 +22,10 @@ import (
"time"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/internal/raftsnap"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
@ -57,10 +57,10 @@ func main() {
walsnap.Index = *index
} else {
if *snapfile == "" {
ss := snap.New(snapDir(dataDir))
ss := raftsnap.New(snapDir(dataDir))
snapshot, err = ss.Load()
} else {
snapshot, err = snap.Read(filepath.Join(snapDir(dataDir), *snapfile))
snapshot, err = raftsnap.Read(filepath.Join(snapDir(dataDir), *snapfile))
}
switch err {
@ -69,7 +69,7 @@ func main() {
nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes)
fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n",
walsnap.Term, walsnap.Index, nodes)
case snap.ErrNoSnapshot:
case raftsnap.ErrNoSnapshot:
fmt.Printf("Snapshot:\nempty\n")
default:
log.Fatalf("Failed loading snapshot: %v", err)