commit
3e5073e9be
|
@ -29,12 +29,12 @@ type Discoverer interface {
|
|||
|
||||
type discovery struct {
|
||||
cluster string
|
||||
id int64
|
||||
id uint64
|
||||
config string
|
||||
c client.Client
|
||||
}
|
||||
|
||||
func New(durl string, id int64, config string) (Discoverer, error) {
|
||||
func New(durl string, id uint64, config string) (Discoverer, error) {
|
||||
u, err := url.Parse(durl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -12,9 +12,9 @@ import (
|
|||
)
|
||||
|
||||
// Cluster is a list of Members that belong to the same raft cluster
|
||||
type Cluster map[int64]*Member
|
||||
type Cluster map[uint64]*Member
|
||||
|
||||
func (c Cluster) FindID(id int64) *Member {
|
||||
func (c Cluster) FindID(id uint64) *Member {
|
||||
return c[id]
|
||||
}
|
||||
|
||||
|
@ -49,7 +49,7 @@ func (c *Cluster) AddSlice(mems []Member) error {
|
|||
|
||||
// Pick chooses a random address from a given Member's addresses, and returns it as
|
||||
// an addressible URI. If the given member does not exist, an empty string is returned.
|
||||
func (c Cluster) Pick(id int64) string {
|
||||
func (c Cluster) Pick(id uint64) string {
|
||||
if m := c.FindID(id); m != nil {
|
||||
urls := m.PeerURLs
|
||||
if len(urls) == 0 {
|
||||
|
@ -95,12 +95,12 @@ func (c Cluster) String() string {
|
|||
return strings.Join(sl, ",")
|
||||
}
|
||||
|
||||
func (c Cluster) IDs() []int64 {
|
||||
var ids []int64
|
||||
func (c Cluster) IDs() []uint64 {
|
||||
var ids []uint64
|
||||
for _, m := range c {
|
||||
ids = append(ids, m.ID)
|
||||
}
|
||||
sort.Sort(types.Int64Slice(ids))
|
||||
sort.Sort(types.Uint64Slice(ids))
|
||||
return ids
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ const (
|
|||
type ClusterStore interface {
|
||||
Add(m Member)
|
||||
Get() Cluster
|
||||
Remove(id int64)
|
||||
Remove(id uint64)
|
||||
}
|
||||
|
||||
type clusterStore struct {
|
||||
|
@ -69,7 +69,7 @@ func (s *clusterStore) Get() Cluster {
|
|||
|
||||
// Remove removes a member from the store.
|
||||
// The given id MUST exist.
|
||||
func (s *clusterStore) Remove(id int64) {
|
||||
func (s *clusterStore) Remove(id uint64) {
|
||||
p := s.Get().FindID(id).storeKey()
|
||||
if _, err := s.Store.Delete(p, false, false); err != nil {
|
||||
log.Panicf("delete peer should never fail: %v", err)
|
||||
|
|
|
@ -90,7 +90,7 @@ func TestClusterPick(t *testing.T) {
|
|||
|
||||
func TestClusterFind(t *testing.T) {
|
||||
tests := []struct {
|
||||
id int64
|
||||
id uint64
|
||||
name string
|
||||
mems []Member
|
||||
match bool
|
||||
|
@ -207,7 +207,7 @@ func TestClusterIDs(t *testing.T) {
|
|||
{ID: 4},
|
||||
{ID: 100},
|
||||
})
|
||||
w := []int64{1, 4, 100}
|
||||
w := []uint64{1, 4, 100}
|
||||
g := cs.IDs()
|
||||
if !reflect.DeepEqual(w, g) {
|
||||
t.Errorf("IDs=%+v, want %+v", g, w)
|
||||
|
|
|
@ -13,7 +13,7 @@ type ServerConfig struct {
|
|||
DiscoveryURL string
|
||||
ClientURLs types.URLs
|
||||
DataDir string
|
||||
SnapCount int64
|
||||
SnapCount uint64
|
||||
Cluster *Cluster
|
||||
ClusterState ClusterState
|
||||
Transport *http.Transport
|
||||
|
|
|
@ -79,7 +79,7 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
|
||||
defer cancel()
|
||||
|
||||
rr, err := parseRequest(r, etcdserver.GenID())
|
||||
rr, err := parseRequest(r, int64(etcdserver.GenID()))
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
return
|
||||
|
|
|
@ -501,8 +501,8 @@ func TestWriteError(t *testing.T) {
|
|||
|
||||
type dummyRaftTimer struct{}
|
||||
|
||||
func (drt dummyRaftTimer) Index() int64 { return int64(100) }
|
||||
func (drt dummyRaftTimer) Term() int64 { return int64(5) }
|
||||
func (drt dummyRaftTimer) Index() uint64 { return uint64(100) }
|
||||
func (drt dummyRaftTimer) Term() uint64 { return uint64(5) }
|
||||
|
||||
func TestWriteEvent(t *testing.T) {
|
||||
// nil event should not panic
|
||||
|
@ -1246,4 +1246,4 @@ func (c *fakeCluster) Get() etcdserver.Cluster {
|
|||
return *cl
|
||||
}
|
||||
|
||||
func (c *fakeCluster) Remove(id int64) { return }
|
||||
func (c *fakeCluster) Remove(id uint64) { return }
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
const machineKVPrefix = "/_etcd/machines/"
|
||||
|
||||
type Member struct {
|
||||
ID int64
|
||||
ID uint64
|
||||
Name string
|
||||
// TODO(philips): ensure these are URLs
|
||||
PeerURLs []string
|
||||
|
@ -36,14 +36,10 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
|
|||
}
|
||||
|
||||
hash := sha1.Sum(b)
|
||||
m.ID = int64(binary.BigEndian.Uint64(hash[:8]))
|
||||
if m.ID < 0 {
|
||||
m.ID = m.ID * -1
|
||||
}
|
||||
|
||||
m.ID = binary.BigEndian.Uint64(hash[:8])
|
||||
return m
|
||||
}
|
||||
|
||||
func (m Member) storeKey() string {
|
||||
return path.Join(machineKVPrefix, strconv.FormatUint(uint64(m.ID), 16))
|
||||
return path.Join(machineKVPrefix, strconv.FormatUint(m.ID, 16))
|
||||
}
|
||||
|
|
|
@ -17,9 +17,9 @@ func timeParse(value string) *time.Time {
|
|||
func TestMemberTime(t *testing.T) {
|
||||
tests := []struct {
|
||||
mem *Member
|
||||
id int64
|
||||
id uint64
|
||||
}{
|
||||
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil), 7206348984215161146},
|
||||
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil), 11240395089494390470},
|
||||
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}}, timeParse("1984-12-23T15:04:05Z")), 5483967913615174889},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
|
|
|
@ -79,8 +79,8 @@ type Server interface {
|
|||
}
|
||||
|
||||
type RaftTimer interface {
|
||||
Index() int64
|
||||
Term() int64
|
||||
Index() uint64
|
||||
Term() uint64
|
||||
}
|
||||
|
||||
// NewServer creates a new EtcdServer from the supplied configuration. The
|
||||
|
@ -125,7 +125,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
|
|||
if cfg.DiscoveryURL != "" {
|
||||
log.Printf("etcd: warn: ignoring discovery URL: etcd has already been initialized and has a valid log in %q", waldir)
|
||||
}
|
||||
var index int64
|
||||
var index uint64
|
||||
snapshot, err := ss.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
log.Fatal(err)
|
||||
|
@ -194,11 +194,11 @@ type EtcdServer struct {
|
|||
ticker <-chan time.Time
|
||||
syncTicker <-chan time.Time
|
||||
|
||||
snapCount int64 // number of entries to trigger a snapshot
|
||||
snapCount uint64 // number of entries to trigger a snapshot
|
||||
|
||||
// Cache of the latest raft index and raft term the server has seen
|
||||
raftIndex int64
|
||||
raftTerm int64
|
||||
raftIndex uint64
|
||||
raftTerm uint64
|
||||
}
|
||||
|
||||
// Start prepares and starts server in a new goroutine. It is no longer safe to
|
||||
|
@ -231,8 +231,8 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
|||
func (s *EtcdServer) run() {
|
||||
var syncC <-chan time.Time
|
||||
// snapi indicates the index of the last submitted snapshot request
|
||||
var snapi, appliedi int64
|
||||
var nodes []int64
|
||||
var snapi, appliedi uint64
|
||||
var nodes []uint64
|
||||
for {
|
||||
select {
|
||||
case <-s.ticker:
|
||||
|
@ -260,12 +260,12 @@ func (s *EtcdServer) run() {
|
|||
panic("TODO: this is bad, what do we do about it?")
|
||||
}
|
||||
s.applyConfChange(cc)
|
||||
s.w.Trigger(cc.ID, nil)
|
||||
s.w.Trigger(int64(cc.ID), nil)
|
||||
default:
|
||||
panic("unexpected entry type")
|
||||
}
|
||||
atomic.StoreInt64(&s.raftIndex, e.Index)
|
||||
atomic.StoreInt64(&s.raftTerm, e.Term)
|
||||
atomic.StoreUint64(&s.raftIndex, e.Index)
|
||||
atomic.StoreUint64(&s.raftTerm, e.Term)
|
||||
appliedi = e.Index
|
||||
}
|
||||
|
||||
|
@ -378,7 +378,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
|
|||
return s.configure(ctx, cc)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) RemoveMember(ctx context.Context, id int64) error {
|
||||
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
|
||||
cc := raftpb.ConfChange{
|
||||
ID: GenID(),
|
||||
Type: raftpb.ConfChangeRemoveNode,
|
||||
|
@ -388,28 +388,28 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id int64) error {
|
|||
}
|
||||
|
||||
// Implement the RaftTimer interface
|
||||
func (s *EtcdServer) Index() int64 {
|
||||
return atomic.LoadInt64(&s.raftIndex)
|
||||
func (s *EtcdServer) Index() uint64 {
|
||||
return atomic.LoadUint64(&s.raftIndex)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) Term() int64 {
|
||||
return atomic.LoadInt64(&s.raftTerm)
|
||||
func (s *EtcdServer) Term() uint64 {
|
||||
return atomic.LoadUint64(&s.raftTerm)
|
||||
}
|
||||
|
||||
// configure sends configuration change through consensus then performs it.
|
||||
// It will block until the change is performed or there is an error.
|
||||
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
||||
ch := s.w.Register(cc.ID)
|
||||
ch := s.w.Register(int64(cc.ID))
|
||||
if err := s.node.ProposeConfChange(ctx, cc); err != nil {
|
||||
log.Printf("configure error: %v", err)
|
||||
s.w.Trigger(cc.ID, nil)
|
||||
s.w.Trigger(int64(cc.ID), nil)
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ch:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
s.w.Trigger(cc.ID, nil) // GC wait
|
||||
s.w.Trigger(int64(cc.ID), nil) // GC wait
|
||||
return ctx.Err()
|
||||
case <-s.done:
|
||||
return ErrStopped
|
||||
|
@ -423,7 +423,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
req := pb.Request{
|
||||
Method: "SYNC",
|
||||
ID: GenID(),
|
||||
ID: int64(GenID()),
|
||||
Time: time.Now().UnixNano(),
|
||||
}
|
||||
data, err := req.Marshal()
|
||||
|
@ -454,7 +454,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
|
|||
return
|
||||
}
|
||||
req := pb.Request{
|
||||
ID: GenID(),
|
||||
ID: int64(GenID()),
|
||||
Method: "PUT",
|
||||
Path: m.storeKey(),
|
||||
Val: string(b),
|
||||
|
@ -554,7 +554,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) {
|
|||
}
|
||||
|
||||
// TODO: non-blocking snapshot
|
||||
func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) {
|
||||
func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
|
||||
d, err := s.store.Save()
|
||||
// TODO: current store will never fail to do a snapshot
|
||||
// what should we do if the store might fail?
|
||||
|
@ -567,9 +567,9 @@ func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) {
|
|||
|
||||
// TODO: move the function to /id pkg maybe?
|
||||
// GenID generates a random id that is not equal to 0.
|
||||
func GenID() (n int64) {
|
||||
func GenID() (n uint64) {
|
||||
for n == 0 {
|
||||
n = rand.Int63()
|
||||
n = uint64(rand.Int63())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -371,7 +371,7 @@ func TestApplyRequest(t *testing.T) {
|
|||
func TestClusterOf1(t *testing.T) { testServer(t, 1) }
|
||||
func TestClusterOf3(t *testing.T) { testServer(t, 3) }
|
||||
|
||||
func testServer(t *testing.T, ns int64) {
|
||||
func testServer(t *testing.T, ns uint64) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
|
@ -384,12 +384,12 @@ func testServer(t *testing.T, ns int64) {
|
|||
}
|
||||
}
|
||||
|
||||
members := make([]int64, ns)
|
||||
for i := int64(0); i < ns; i++ {
|
||||
members := make([]uint64, ns)
|
||||
for i := uint64(0); i < ns; i++ {
|
||||
members[i] = i + 1
|
||||
}
|
||||
|
||||
for i := int64(0); i < ns; i++ {
|
||||
for i := uint64(0); i < ns; i++ {
|
||||
id := i + 1
|
||||
n := raft.StartNode(id, members, 10, 1)
|
||||
tk := time.NewTicker(10 * time.Millisecond)
|
||||
|
@ -458,7 +458,7 @@ func TestDoProposal(t *testing.T) {
|
|||
|
||||
for i, tt := range tests {
|
||||
ctx, _ := context.WithCancel(context.Background())
|
||||
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
|
||||
n := raft.StartNode(0xBAD0, []uint64{0xBAD0}, 10, 1)
|
||||
st := &storeRecorder{}
|
||||
tk := make(chan time.Time)
|
||||
// this makes <-tk always successful, which accelerates internal clock
|
||||
|
@ -491,7 +491,7 @@ func TestDoProposal(t *testing.T) {
|
|||
func TestDoProposalCancelled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// node cannot make any progress because there are two nodes
|
||||
n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
|
||||
n := raft.StartNode(0xBAD0, []uint64{0xBAD0, 0xBAD1}, 10, 1)
|
||||
st := &storeRecorder{}
|
||||
wait := &waitRecorder{}
|
||||
srv := &EtcdServer{
|
||||
|
@ -527,7 +527,7 @@ func TestDoProposalStopped(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
// node cannot make any progress because there are two nodes
|
||||
n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
|
||||
n := raft.StartNode(0xBAD0, []uint64{0xBAD0, 0xBAD1}, 10, 1)
|
||||
st := &storeRecorder{}
|
||||
tk := make(chan time.Time)
|
||||
// this makes <-tk always successful, which accelarates internal clock
|
||||
|
@ -668,7 +668,7 @@ func TestSyncTrigger(t *testing.T) {
|
|||
// snapshot should snapshot the store and cut the persistent
|
||||
// TODO: node.Compact is called... we need to make the node an interface
|
||||
func TestSnapshot(t *testing.T) {
|
||||
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
|
||||
n := raft.StartNode(0xBAD0, []uint64{0xBAD0}, 10, 1)
|
||||
defer n.Stop()
|
||||
st := &storeRecorder{}
|
||||
p := &storageRecorder{}
|
||||
|
@ -678,7 +678,7 @@ func TestSnapshot(t *testing.T) {
|
|||
node: n,
|
||||
}
|
||||
|
||||
s.snapshot(0, []int64{1})
|
||||
s.snapshot(0, []uint64{1})
|
||||
gaction := st.Action()
|
||||
if len(gaction) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(gaction))
|
||||
|
@ -699,7 +699,7 @@ func TestSnapshot(t *testing.T) {
|
|||
// Applied > SnapCount should trigger a SaveSnap event
|
||||
func TestTriggerSnap(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
|
||||
n := raft.StartNode(0xBAD0, []uint64{0xBAD0}, 10, 1)
|
||||
n.Campaign(ctx)
|
||||
st := &storeRecorder{}
|
||||
p := &storageRecorder{}
|
||||
|
@ -712,7 +712,7 @@ func TestTriggerSnap(t *testing.T) {
|
|||
}
|
||||
|
||||
s.start()
|
||||
for i := 0; int64(i) < s.snapCount-1; i++ {
|
||||
for i := 0; uint64(i) < s.snapCount-1; i++ {
|
||||
s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
|
@ -825,7 +825,7 @@ func TestRemoveMember(t *testing.T) {
|
|||
ClusterStore: cs,
|
||||
}
|
||||
s.start()
|
||||
id := int64(1)
|
||||
id := uint64(1)
|
||||
s.RemoveMember(context.TODO(), id)
|
||||
gaction := n.Action()
|
||||
s.Stop()
|
||||
|
@ -962,9 +962,9 @@ func TestGenID(t *testing.T) {
|
|||
// Sanity check that the GenID function has been seeded appropriately
|
||||
// (math/rand is seeded with 1 by default)
|
||||
r := rand.NewSource(int64(1))
|
||||
var n int64
|
||||
var n uint64
|
||||
for n == 0 {
|
||||
n = r.Int63()
|
||||
n = uint64(r.Int63())
|
||||
}
|
||||
if n == GenID() {
|
||||
t.Fatalf("GenID's rand seeded with 1!")
|
||||
|
@ -1143,7 +1143,7 @@ func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return
|
|||
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
|
||||
func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {}
|
||||
func (n *readyNode) Stop() {}
|
||||
func (n *readyNode) Compact(index int64, nodes []int64, d []byte) {}
|
||||
func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte) {}
|
||||
|
||||
type nodeRecorder struct {
|
||||
recorder
|
||||
|
@ -1175,7 +1175,7 @@ func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
|
|||
func (n *nodeRecorder) Stop() {
|
||||
n.record(action{name: "Stop"})
|
||||
}
|
||||
func (n *nodeRecorder) Compact(index int64, nodes []int64, d []byte) {
|
||||
func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
|
||||
n.record(action{name: "Compact"})
|
||||
}
|
||||
|
||||
|
@ -1255,7 +1255,7 @@ func (cs *clusterStoreRecorder) Get() Cluster {
|
|||
cs.record(action{name: "Get"})
|
||||
return nil
|
||||
}
|
||||
func (cs *clusterStoreRecorder) Remove(id int64) {
|
||||
func (cs *clusterStoreRecorder) Remove(id uint64) {
|
||||
cs.record(action{name: "Remove", params: []interface{}{id}})
|
||||
}
|
||||
|
||||
|
|
2
main.go
2
main.go
|
@ -154,7 +154,7 @@ func startEtcd() {
|
|||
Name: *name,
|
||||
ClientURLs: acurls,
|
||||
DataDir: *dir,
|
||||
SnapCount: int64(*snapCount),
|
||||
SnapCount: *snapCount,
|
||||
Cluster: cluster,
|
||||
DiscoveryURL: *durl,
|
||||
ClusterState: *clusterState,
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package types
|
||||
|
||||
// Int64Slice implements sort interface
|
||||
type Int64Slice []int64
|
||||
// Uint64Slice implements sort interface
|
||||
type Uint64Slice []uint64
|
||||
|
||||
func (p Int64Slice) Len() int { return len(p) }
|
||||
func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] }
|
||||
func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
func (p Uint64Slice) Len() int { return len(p) }
|
||||
func (p Uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
|
||||
func (p Uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
|
62
raft/log.go
62
raft/log.go
|
@ -12,15 +12,15 @@ const (
|
|||
|
||||
type raftLog struct {
|
||||
ents []pb.Entry
|
||||
unstable int64
|
||||
committed int64
|
||||
applied int64
|
||||
offset int64
|
||||
unstable uint64
|
||||
committed uint64
|
||||
applied uint64
|
||||
offset uint64
|
||||
snapshot pb.Snapshot
|
||||
|
||||
// want a compact after the number of entries exceeds the threshold
|
||||
// TODO(xiangli) size might be a better criteria
|
||||
compactThreshold int64
|
||||
compactThreshold uint64
|
||||
}
|
||||
|
||||
func newLog() *raftLog {
|
||||
|
@ -39,20 +39,20 @@ func (l *raftLog) isEmpty() bool {
|
|||
|
||||
func (l *raftLog) load(ents []pb.Entry) {
|
||||
l.ents = ents
|
||||
l.unstable = l.offset + int64(len(ents))
|
||||
l.unstable = l.offset + uint64(len(ents))
|
||||
}
|
||||
|
||||
func (l *raftLog) String() string {
|
||||
return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents))
|
||||
}
|
||||
|
||||
func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...pb.Entry) bool {
|
||||
lastnewi := index + int64(len(ents))
|
||||
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) bool {
|
||||
lastnewi := index + uint64(len(ents))
|
||||
if l.matchTerm(index, logTerm) {
|
||||
from := index + 1
|
||||
ci := l.findConflict(from, ents)
|
||||
switch {
|
||||
case ci == -1:
|
||||
case ci == 0:
|
||||
case ci <= l.committed:
|
||||
panic("conflict with committed entry")
|
||||
default:
|
||||
|
@ -68,19 +68,19 @@ func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...pb.Entry)
|
|||
return false
|
||||
}
|
||||
|
||||
func (l *raftLog) append(after int64, ents ...pb.Entry) int64 {
|
||||
func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {
|
||||
l.ents = append(l.slice(l.offset, after+1), ents...)
|
||||
l.unstable = min(l.unstable, after+1)
|
||||
return l.lastIndex()
|
||||
}
|
||||
|
||||
func (l *raftLog) findConflict(from int64, ents []pb.Entry) int64 {
|
||||
func (l *raftLog) findConflict(from uint64, ents []pb.Entry) uint64 {
|
||||
for i, ne := range ents {
|
||||
if oe := l.at(from + int64(i)); oe == nil || oe.Term != ne.Term {
|
||||
return from + int64(i)
|
||||
if oe := l.at(from + uint64(i)); oe == nil || oe.Term != ne.Term {
|
||||
return from + uint64(i)
|
||||
}
|
||||
}
|
||||
return -1
|
||||
return 0
|
||||
}
|
||||
|
||||
func (l *raftLog) unstableEnts() []pb.Entry {
|
||||
|
@ -112,18 +112,18 @@ func (l *raftLog) resetNextEnts() {
|
|||
}
|
||||
}
|
||||
|
||||
func (l *raftLog) lastIndex() int64 {
|
||||
return int64(len(l.ents)) - 1 + l.offset
|
||||
func (l *raftLog) lastIndex() uint64 {
|
||||
return uint64(len(l.ents)) - 1 + l.offset
|
||||
}
|
||||
|
||||
func (l *raftLog) term(i int64) int64 {
|
||||
func (l *raftLog) term(i uint64) uint64 {
|
||||
if e := l.at(i); e != nil {
|
||||
return e.Term
|
||||
}
|
||||
return -1
|
||||
return 0
|
||||
}
|
||||
|
||||
func (l *raftLog) entries(i int64) []pb.Entry {
|
||||
func (l *raftLog) entries(i uint64) []pb.Entry {
|
||||
// never send out the first entry
|
||||
// first entry is only used for matching
|
||||
// prevLogTerm
|
||||
|
@ -133,19 +133,19 @@ func (l *raftLog) entries(i int64) []pb.Entry {
|
|||
return l.slice(i, l.lastIndex()+1)
|
||||
}
|
||||
|
||||
func (l *raftLog) isUpToDate(i, term int64) bool {
|
||||
func (l *raftLog) isUpToDate(i, term uint64) bool {
|
||||
e := l.at(l.lastIndex())
|
||||
return term > e.Term || (term == e.Term && i >= l.lastIndex())
|
||||
}
|
||||
|
||||
func (l *raftLog) matchTerm(i, term int64) bool {
|
||||
func (l *raftLog) matchTerm(i, term uint64) bool {
|
||||
if e := l.at(i); e != nil {
|
||||
return e.Term == term
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (l *raftLog) maybeCommit(maxIndex, term int64) bool {
|
||||
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
|
||||
if maxIndex > l.committed && l.term(maxIndex) == term {
|
||||
l.committed = maxIndex
|
||||
return true
|
||||
|
@ -158,17 +158,17 @@ func (l *raftLog) maybeCommit(maxIndex, term int64) bool {
|
|||
// i must be not smaller than the index of the first entry
|
||||
// and not greater than the index of the last entry.
|
||||
// the number of entries after compaction will be returned.
|
||||
func (l *raftLog) compact(i int64) int64 {
|
||||
func (l *raftLog) compact(i uint64) uint64 {
|
||||
if l.isOutOfAppliedBounds(i) {
|
||||
panic(fmt.Sprintf("compact %d out of bounds [%d:%d]", i, l.offset, l.applied))
|
||||
}
|
||||
l.ents = l.slice(i, l.lastIndex()+1)
|
||||
l.unstable = max(i+1, l.unstable)
|
||||
l.offset = i
|
||||
return int64(len(l.ents))
|
||||
return uint64(len(l.ents))
|
||||
}
|
||||
|
||||
func (l *raftLog) snap(d []byte, index, term int64, nodes []int64, removed []int64) {
|
||||
func (l *raftLog) snap(d []byte, index, term uint64, nodes []uint64, removed []uint64) {
|
||||
l.snapshot = pb.Snapshot{
|
||||
Data: d,
|
||||
Nodes: nodes,
|
||||
|
@ -191,7 +191,7 @@ func (l *raftLog) restore(s pb.Snapshot) {
|
|||
l.snapshot = s
|
||||
}
|
||||
|
||||
func (l *raftLog) at(i int64) *pb.Entry {
|
||||
func (l *raftLog) at(i uint64) *pb.Entry {
|
||||
if l.isOutOfBounds(i) {
|
||||
return nil
|
||||
}
|
||||
|
@ -199,7 +199,7 @@ func (l *raftLog) at(i int64) *pb.Entry {
|
|||
}
|
||||
|
||||
// slice returns a slice of log entries from lo through hi-1, inclusive.
|
||||
func (l *raftLog) slice(lo int64, hi int64) []pb.Entry {
|
||||
func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
|
||||
if lo >= hi {
|
||||
return nil
|
||||
}
|
||||
|
@ -209,28 +209,28 @@ func (l *raftLog) slice(lo int64, hi int64) []pb.Entry {
|
|||
return l.ents[lo-l.offset : hi-l.offset]
|
||||
}
|
||||
|
||||
func (l *raftLog) isOutOfBounds(i int64) bool {
|
||||
func (l *raftLog) isOutOfBounds(i uint64) bool {
|
||||
if i < l.offset || i > l.lastIndex() {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (l *raftLog) isOutOfAppliedBounds(i int64) bool {
|
||||
func (l *raftLog) isOutOfAppliedBounds(i uint64) bool {
|
||||
if i < l.offset || i > l.applied {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func min(a, b int64) int64 {
|
||||
func min(a, b uint64) uint64 {
|
||||
if a > b {
|
||||
return b
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func max(a, b int64) int64 {
|
||||
func max(a, b uint64) uint64 {
|
||||
if a > b {
|
||||
return a
|
||||
}
|
||||
|
|
|
@ -14,13 +14,13 @@ import (
|
|||
// 2.Append any new entries not already in the log
|
||||
func TestAppend(t *testing.T) {
|
||||
previousEnts := []pb.Entry{{Term: 1}, {Term: 2}}
|
||||
previousUnstable := int64(3)
|
||||
previousUnstable := uint64(3)
|
||||
tests := []struct {
|
||||
after int64
|
||||
after uint64
|
||||
ents []pb.Entry
|
||||
windex int64
|
||||
windex uint64
|
||||
wents []pb.Entry
|
||||
wunstable int64
|
||||
wunstable uint64
|
||||
}{
|
||||
{
|
||||
2,
|
||||
|
@ -74,13 +74,13 @@ func TestAppend(t *testing.T) {
|
|||
// TestCompactionSideEffects ensures that all the log related funcationality works correctly after
|
||||
// a compaction.
|
||||
func TestCompactionSideEffects(t *testing.T) {
|
||||
var i int64
|
||||
lastIndex := int64(1000)
|
||||
var i uint64
|
||||
lastIndex := uint64(1000)
|
||||
lastTerm := lastIndex
|
||||
raftLog := newLog()
|
||||
|
||||
for i = 0; i < lastIndex; i++ {
|
||||
raftLog.append(int64(i), pb.Entry{Term: int64(i + 1), Index: int64(i + 1)})
|
||||
raftLog.append(uint64(i), pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)})
|
||||
}
|
||||
raftLog.maybeCommit(lastIndex, lastTerm)
|
||||
raftLog.resetNextEnts()
|
||||
|
@ -126,9 +126,9 @@ func TestCompactionSideEffects(t *testing.T) {
|
|||
func TestUnstableEnts(t *testing.T) {
|
||||
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
|
||||
tests := []struct {
|
||||
unstable int64
|
||||
unstable uint64
|
||||
wents []pb.Entry
|
||||
wunstable int64
|
||||
wunstable uint64
|
||||
}{
|
||||
{3, nil, 3},
|
||||
{1, previousEnts, 3},
|
||||
|
@ -152,18 +152,18 @@ func TestUnstableEnts(t *testing.T) {
|
|||
//TestCompaction ensures that the number of log entreis is correct after compactions.
|
||||
func TestCompaction(t *testing.T) {
|
||||
tests := []struct {
|
||||
applied int64
|
||||
lastIndex int64
|
||||
compact []int64
|
||||
applied uint64
|
||||
lastIndex uint64
|
||||
compact []uint64
|
||||
wleft []int
|
||||
wallow bool
|
||||
}{
|
||||
// out of upper bound
|
||||
{1000, 1000, []int64{1001}, []int{-1}, false},
|
||||
{1000, 1000, []int64{300, 500, 800, 900}, []int{701, 501, 201, 101}, true},
|
||||
{1000, 1000, []uint64{1001}, []int{-1}, false},
|
||||
{1000, 1000, []uint64{300, 500, 800, 900}, []int{701, 501, 201, 101}, true},
|
||||
// out of lower bound
|
||||
{1000, 1000, []int64{300, 299}, []int{701, -1}, false},
|
||||
{0, 1000, []int64{1}, []int{-1}, false},
|
||||
{1000, 1000, []uint64{300, 299}, []int{701, -1}, false},
|
||||
{0, 1000, []uint64{1}, []int{-1}, false},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
|
@ -177,8 +177,8 @@ func TestCompaction(t *testing.T) {
|
|||
}()
|
||||
|
||||
raftLog := newLog()
|
||||
for i := int64(0); i < tt.lastIndex; i++ {
|
||||
raftLog.append(int64(i), pb.Entry{})
|
||||
for i := uint64(0); i < tt.lastIndex; i++ {
|
||||
raftLog.append(uint64(i), pb.Entry{})
|
||||
}
|
||||
raftLog.maybeCommit(tt.applied, 0)
|
||||
raftLog.resetNextEnts()
|
||||
|
@ -194,14 +194,14 @@ func TestCompaction(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLogRestore(t *testing.T) {
|
||||
var i int64
|
||||
var i uint64
|
||||
raftLog := newLog()
|
||||
for i = 0; i < 100; i++ {
|
||||
raftLog.append(i, pb.Entry{Term: i + 1})
|
||||
}
|
||||
|
||||
index := int64(1000)
|
||||
term := int64(1000)
|
||||
index := uint64(1000)
|
||||
term := uint64(1000)
|
||||
raftLog.restore(pb.Snapshot{Index: index, Term: term})
|
||||
|
||||
// only has the guard entry
|
||||
|
@ -226,12 +226,12 @@ func TestLogRestore(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIsOutOfBounds(t *testing.T) {
|
||||
offset := int64(100)
|
||||
num := int64(100)
|
||||
offset := uint64(100)
|
||||
num := uint64(100)
|
||||
l := &raftLog{offset: offset, ents: make([]pb.Entry, num)}
|
||||
|
||||
tests := []struct {
|
||||
index int64
|
||||
index uint64
|
||||
w bool
|
||||
}{
|
||||
{offset - 1, true},
|
||||
|
@ -250,9 +250,9 @@ func TestIsOutOfBounds(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAt(t *testing.T) {
|
||||
var i int64
|
||||
offset := int64(100)
|
||||
num := int64(100)
|
||||
var i uint64
|
||||
offset := uint64(100)
|
||||
num := uint64(100)
|
||||
|
||||
l := &raftLog{offset: offset}
|
||||
for i = 0; i < num; i++ {
|
||||
|
@ -260,7 +260,7 @@ func TestAt(t *testing.T) {
|
|||
}
|
||||
|
||||
tests := []struct {
|
||||
index int64
|
||||
index uint64
|
||||
w *pb.Entry
|
||||
}{
|
||||
{offset - 1, nil},
|
||||
|
@ -279,9 +279,9 @@ func TestAt(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSlice(t *testing.T) {
|
||||
var i int64
|
||||
offset := int64(100)
|
||||
num := int64(100)
|
||||
var i uint64
|
||||
offset := uint64(100)
|
||||
num := uint64(100)
|
||||
|
||||
l := &raftLog{offset: offset}
|
||||
for i = 0; i < num; i++ {
|
||||
|
@ -289,8 +289,8 @@ func TestSlice(t *testing.T) {
|
|||
}
|
||||
|
||||
tests := []struct {
|
||||
from int64
|
||||
to int64
|
||||
from uint64
|
||||
to uint64
|
||||
w []pb.Entry
|
||||
}{
|
||||
{offset - 1, offset + 1, nil},
|
||||
|
|
22
raft/node.go
22
raft/node.go
|
@ -19,9 +19,9 @@ var (
|
|||
// SoftState provides state that is useful for logging and debugging.
|
||||
// The state is volatile and does not need to be persisted to the WAL.
|
||||
type SoftState struct {
|
||||
Lead int64
|
||||
Lead uint64
|
||||
RaftState StateType
|
||||
Nodes []int64
|
||||
Nodes []uint64
|
||||
ShouldStop bool
|
||||
}
|
||||
|
||||
|
@ -61,8 +61,8 @@ type Ready struct {
|
|||
}
|
||||
|
||||
type compact struct {
|
||||
index int64
|
||||
nodes []int64
|
||||
index uint64
|
||||
nodes []uint64
|
||||
data []byte
|
||||
}
|
||||
|
||||
|
@ -114,13 +114,13 @@ type Node interface {
|
|||
// It is the caller's responsibility to ensure the given configuration
|
||||
// and snapshot data match the actual point-in-time configuration and snapshot
|
||||
// at the given index.
|
||||
Compact(index int64, nodes []int64, d []byte)
|
||||
Compact(index uint64, nodes []uint64, d []byte)
|
||||
}
|
||||
|
||||
// StartNode returns a new Node given a unique raft id, a list of raft peers, and
|
||||
// the election and heartbeat timeouts in units of ticks.
|
||||
// It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log.
|
||||
func StartNode(id int64, peers []int64, election, heartbeat int) Node {
|
||||
func StartNode(id uint64, peers []uint64, election, heartbeat int) Node {
|
||||
n := newNode()
|
||||
r := newRaft(id, peers, election, heartbeat)
|
||||
|
||||
|
@ -131,10 +131,10 @@ func StartNode(id int64, peers []int64, election, heartbeat int) Node {
|
|||
if err != nil {
|
||||
panic("unexpected marshal error")
|
||||
}
|
||||
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: int64(i + 1), Data: data}
|
||||
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
|
||||
}
|
||||
r.raftLog.append(0, ents...)
|
||||
r.raftLog.committed = int64(len(ents))
|
||||
r.raftLog.committed = uint64(len(ents))
|
||||
|
||||
go n.run(r)
|
||||
return &n
|
||||
|
@ -143,7 +143,7 @@ func StartNode(id int64, peers []int64, election, heartbeat int) Node {
|
|||
// RestartNode is identical to StartNode but takes an initial State and a slice
|
||||
// of entries. Generally this is used when restarting from a stable storage
|
||||
// log.
|
||||
func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node {
|
||||
func RestartNode(id uint64, peers []uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node {
|
||||
n := newNode()
|
||||
r := newRaft(id, peers, election, heartbeat)
|
||||
if snapshot != nil {
|
||||
|
@ -317,14 +317,14 @@ func (n *node) ApplyConfChange(cc pb.ConfChange) {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *node) Compact(index int64, nodes []int64, d []byte) {
|
||||
func (n *node) Compact(index uint64, nodes []uint64, d []byte) {
|
||||
select {
|
||||
case n.compactc <- compact{index, nodes, d}:
|
||||
case <-n.done:
|
||||
}
|
||||
}
|
||||
|
||||
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready {
|
||||
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi uint64) Ready {
|
||||
rd := Ready{
|
||||
Entries: r.raftLog.unstableEnts(),
|
||||
CommittedEntries: r.raftLog.nextEnts(),
|
||||
|
|
|
@ -18,10 +18,10 @@ func TestNodeStep(t *testing.T) {
|
|||
propc: make(chan raftpb.Message, 1),
|
||||
recvc: make(chan raftpb.Message, 1),
|
||||
}
|
||||
msgt := int64(i)
|
||||
msgt := uint64(i)
|
||||
n.Step(context.TODO(), raftpb.Message{Type: msgt})
|
||||
// Proposal goes to proc chan. Others go to recvc chan.
|
||||
if int64(i) == msgProp {
|
||||
if uint64(i) == msgProp {
|
||||
select {
|
||||
case <-n.propc:
|
||||
default:
|
||||
|
@ -96,7 +96,7 @@ func TestNodeStepUnblock(t *testing.T) {
|
|||
// who is the current leader.
|
||||
func TestBlockProposal(t *testing.T) {
|
||||
n := newNode()
|
||||
r := newRaft(1, []int64{1}, 10, 1)
|
||||
r := newRaft(1, []uint64{1}, 10, 1)
|
||||
go n.run(r)
|
||||
defer n.Stop()
|
||||
|
||||
|
@ -156,7 +156,7 @@ func TestNode(t *testing.T) {
|
|||
}
|
||||
wants := []Ready{
|
||||
{
|
||||
SoftState: &SoftState{Lead: 1, Nodes: []int64{1}, RaftState: StateLeader},
|
||||
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader},
|
||||
HardState: raftpb.HardState{Term: 1, Commit: 2},
|
||||
Entries: []raftpb.Entry{
|
||||
{},
|
||||
|
@ -175,7 +175,7 @@ func TestNode(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
n := StartNode(1, []int64{1}, 10, 1)
|
||||
n := StartNode(1, []uint64{1}, 10, 1)
|
||||
n.Campaign(ctx)
|
||||
if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
|
||||
t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
|
||||
|
@ -207,7 +207,7 @@ func TestNodeRestart(t *testing.T) {
|
|||
CommittedEntries: entries[1 : st.Commit+1],
|
||||
}
|
||||
|
||||
n := RestartNode(1, []int64{1}, 10, 1, nil, st, entries)
|
||||
n := RestartNode(1, []uint64{1}, 10, 1, nil, st, entries)
|
||||
if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
|
||||
t.Errorf("g = %+v,\n w %+v", g, want)
|
||||
}
|
||||
|
@ -224,7 +224,7 @@ func TestNodeRestart(t *testing.T) {
|
|||
func TestNodeCompact(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
n := newNode()
|
||||
r := newRaft(1, []int64{1}, 10, 1)
|
||||
r := newRaft(1, []uint64{1}, 10, 1)
|
||||
go n.run(r)
|
||||
|
||||
n.Campaign(ctx)
|
||||
|
@ -234,8 +234,8 @@ func TestNodeCompact(t *testing.T) {
|
|||
Term: 1,
|
||||
Index: 2, // one nop + one proposal
|
||||
Data: []byte("a snapshot"),
|
||||
Nodes: []int64{1},
|
||||
RemovedNodes: []int64{},
|
||||
Nodes: []uint64{1},
|
||||
RemovedNodes: []uint64{},
|
||||
}
|
||||
|
||||
pkg.ForceGosched()
|
||||
|
@ -279,7 +279,7 @@ func TestSoftStateEqual(t *testing.T) {
|
|||
{&SoftState{Lead: 1}, false},
|
||||
{&SoftState{RaftState: StateLeader}, false},
|
||||
{&SoftState{ShouldStop: true}, false},
|
||||
{&SoftState{Nodes: []int64{1, 2}}, false},
|
||||
{&SoftState{Nodes: []uint64{1, 2}}, false},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
if g := tt.st.equal(&SoftState{}); g != tt.we {
|
||||
|
|
84
raft/raft.go
84
raft/raft.go
|
@ -10,12 +10,12 @@ import (
|
|||
)
|
||||
|
||||
// None is a placeholder node ID used when there is no leader.
|
||||
const None int64 = 0
|
||||
const None uint64 = 0
|
||||
|
||||
type messageType int64
|
||||
type messageType uint64
|
||||
|
||||
const (
|
||||
msgHup int64 = iota
|
||||
msgHup uint64 = iota
|
||||
msgBeat
|
||||
msgProp
|
||||
msgApp
|
||||
|
@ -39,7 +39,7 @@ var mtmap = [...]string{
|
|||
}
|
||||
|
||||
func (mt messageType) String() string {
|
||||
return mtmap[int64(mt)]
|
||||
return mtmap[uint64(mt)]
|
||||
}
|
||||
|
||||
var errNoLeader = errors.New("no leader")
|
||||
|
@ -52,7 +52,7 @@ const (
|
|||
)
|
||||
|
||||
// StateType represents the role of a node in a cluster.
|
||||
type StateType int64
|
||||
type StateType uint64
|
||||
|
||||
var stmap = [...]string{
|
||||
"StateFollower",
|
||||
|
@ -61,21 +61,21 @@ var stmap = [...]string{
|
|||
}
|
||||
|
||||
func (st StateType) String() string {
|
||||
return stmap[int64(st)]
|
||||
return stmap[uint64(st)]
|
||||
}
|
||||
|
||||
type progress struct {
|
||||
match, next int64
|
||||
match, next uint64
|
||||
}
|
||||
|
||||
func (pr *progress) update(n int64) {
|
||||
func (pr *progress) update(n uint64) {
|
||||
pr.match = n
|
||||
pr.next = n + 1
|
||||
}
|
||||
|
||||
// maybeDecrTo returns false if the given to index comes from an out of order message.
|
||||
// Otherwise it decreases the progress next index and returns true.
|
||||
func (pr *progress) maybeDecrTo(to int64) bool {
|
||||
func (pr *progress) maybeDecrTo(to uint64) bool {
|
||||
// the rejection must be stale if the
|
||||
// progress has matched with follower
|
||||
// or "to" does not match next - 1
|
||||
|
@ -93,37 +93,37 @@ func (pr *progress) String() string {
|
|||
return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
|
||||
}
|
||||
|
||||
// int64Slice implements sort interface
|
||||
type int64Slice []int64
|
||||
// uint64Slice implements sort interface
|
||||
type uint64Slice []uint64
|
||||
|
||||
func (p int64Slice) Len() int { return len(p) }
|
||||
func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
|
||||
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
func (p uint64Slice) Len() int { return len(p) }
|
||||
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
|
||||
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
||||
type raft struct {
|
||||
pb.HardState
|
||||
|
||||
id int64
|
||||
id uint64
|
||||
|
||||
// the log
|
||||
raftLog *raftLog
|
||||
|
||||
prs map[int64]*progress
|
||||
prs map[uint64]*progress
|
||||
|
||||
state StateType
|
||||
|
||||
votes map[int64]bool
|
||||
votes map[uint64]bool
|
||||
|
||||
msgs []pb.Message
|
||||
|
||||
// the leader id
|
||||
lead int64
|
||||
lead uint64
|
||||
|
||||
// New configuration is ignored if there exists unapplied configuration.
|
||||
pendingConf bool
|
||||
|
||||
// TODO: need GC and recovery from snapshot
|
||||
removed map[int64]bool
|
||||
removed map[uint64]bool
|
||||
|
||||
elapsed int // number of ticks since the last msg
|
||||
heartbeatTimeout int
|
||||
|
@ -132,17 +132,17 @@ type raft struct {
|
|||
step stepFunc
|
||||
}
|
||||
|
||||
func newRaft(id int64, peers []int64, election, heartbeat int) *raft {
|
||||
func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
|
||||
if id == None {
|
||||
panic("cannot use none id")
|
||||
}
|
||||
rand.Seed(id)
|
||||
rand.Seed(int64(id))
|
||||
r := &raft{
|
||||
id: id,
|
||||
lead: None,
|
||||
raftLog: newLog(),
|
||||
prs: make(map[int64]*progress),
|
||||
removed: make(map[int64]bool),
|
||||
prs: make(map[uint64]*progress),
|
||||
removed: make(map[uint64]bool),
|
||||
electionTimeout: election,
|
||||
heartbeatTimeout: heartbeat,
|
||||
}
|
||||
|
@ -174,7 +174,7 @@ func (r *raft) String() string {
|
|||
return s
|
||||
}
|
||||
|
||||
func (r *raft) poll(id int64, v bool) (granted int) {
|
||||
func (r *raft) poll(id uint64, v bool) (granted int) {
|
||||
if _, ok := r.votes[id]; !ok {
|
||||
r.votes[id] = v
|
||||
}
|
||||
|
@ -199,7 +199,7 @@ func (r *raft) send(m pb.Message) {
|
|||
}
|
||||
|
||||
// sendAppend sends RRPC, with entries to the given peer.
|
||||
func (r *raft) sendAppend(to int64) {
|
||||
func (r *raft) sendAppend(to uint64) {
|
||||
pr := r.prs[to]
|
||||
m := pb.Message{}
|
||||
m.To = to
|
||||
|
@ -217,7 +217,7 @@ func (r *raft) sendAppend(to int64) {
|
|||
}
|
||||
|
||||
// sendHeartbeat sends an empty msgApp
|
||||
func (r *raft) sendHeartbeat(to int64) {
|
||||
func (r *raft) sendHeartbeat(to uint64) {
|
||||
m := pb.Message{
|
||||
To: to,
|
||||
Type: msgApp,
|
||||
|
@ -247,7 +247,7 @@ func (r *raft) bcastHeartbeat() {
|
|||
|
||||
func (r *raft) maybeCommit() bool {
|
||||
// TODO(bmizerany): optimize.. Currently naive
|
||||
mis := make(int64Slice, 0, len(r.prs))
|
||||
mis := make(uint64Slice, 0, len(r.prs))
|
||||
for i := range r.prs {
|
||||
mis = append(mis, r.prs[i].match)
|
||||
}
|
||||
|
@ -257,12 +257,12 @@ func (r *raft) maybeCommit() bool {
|
|||
return r.raftLog.maybeCommit(mci, r.Term)
|
||||
}
|
||||
|
||||
func (r *raft) reset(term int64) {
|
||||
func (r *raft) reset(term uint64) {
|
||||
r.Term = term
|
||||
r.lead = None
|
||||
r.Vote = None
|
||||
r.elapsed = 0
|
||||
r.votes = make(map[int64]bool)
|
||||
r.votes = make(map[uint64]bool)
|
||||
for i := range r.prs {
|
||||
r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
|
||||
if i == r.id {
|
||||
|
@ -306,7 +306,7 @@ func (r *raft) tickHeartbeat() {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *raft) becomeFollower(term int64, lead int64) {
|
||||
func (r *raft) becomeFollower(term uint64, lead uint64) {
|
||||
r.step = stepFollower
|
||||
r.reset(term)
|
||||
r.tick = r.tickElection
|
||||
|
@ -423,12 +423,12 @@ func (r *raft) handleSnapshot(m pb.Message) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *raft) addNode(id int64) {
|
||||
func (r *raft) addNode(id uint64) {
|
||||
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
|
||||
r.pendingConf = false
|
||||
}
|
||||
|
||||
func (r *raft) removeNode(id int64) {
|
||||
func (r *raft) removeNode(id uint64) {
|
||||
r.delProgress(id)
|
||||
r.pendingConf = false
|
||||
r.removed[id] = true
|
||||
|
@ -519,7 +519,7 @@ func stepFollower(r *raft, m pb.Message) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *raft) compact(index int64, nodes []int64, d []byte) {
|
||||
func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
|
||||
if index > r.raftLog.applied {
|
||||
panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
|
||||
}
|
||||
|
@ -538,7 +538,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
|||
}
|
||||
|
||||
r.raftLog.restore(s)
|
||||
r.prs = make(map[int64]*progress)
|
||||
r.prs = make(map[uint64]*progress)
|
||||
for _, n := range s.Nodes {
|
||||
if n == r.id {
|
||||
r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1)
|
||||
|
@ -546,14 +546,14 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
|||
r.setProgress(n, 0, r.raftLog.lastIndex()+1)
|
||||
}
|
||||
}
|
||||
r.removed = make(map[int64]bool)
|
||||
r.removed = make(map[uint64]bool)
|
||||
for _, n := range s.RemovedNodes {
|
||||
r.removed[n] = true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *raft) needSnapshot(i int64) bool {
|
||||
func (r *raft) needSnapshot(i uint64) bool {
|
||||
if i < r.raftLog.offset {
|
||||
if r.raftLog.snapshot.Term == 0 {
|
||||
panic("need non-empty snapshot")
|
||||
|
@ -563,27 +563,27 @@ func (r *raft) needSnapshot(i int64) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (r *raft) nodes() []int64 {
|
||||
nodes := make([]int64, 0, len(r.prs))
|
||||
func (r *raft) nodes() []uint64 {
|
||||
nodes := make([]uint64, 0, len(r.prs))
|
||||
for k := range r.prs {
|
||||
nodes = append(nodes, k)
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (r *raft) removedNodes() []int64 {
|
||||
removed := make([]int64, 0, len(r.removed))
|
||||
func (r *raft) removedNodes() []uint64 {
|
||||
removed := make([]uint64, 0, len(r.removed))
|
||||
for k := range r.removed {
|
||||
removed = append(removed, k)
|
||||
}
|
||||
return removed
|
||||
}
|
||||
|
||||
func (r *raft) setProgress(id, match, next int64) {
|
||||
func (r *raft) setProgress(id, match, next uint64) {
|
||||
r.prs[id] = &progress{next: next, match: match}
|
||||
}
|
||||
|
||||
func (r *raft) delProgress(id int64) {
|
||||
func (r *raft) delProgress(id uint64) {
|
||||
delete(r.prs, id)
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ func TestLogReplication(t *testing.T) {
|
|||
tests := []struct {
|
||||
*network
|
||||
msgs []pb.Message
|
||||
wcommitted int64
|
||||
wcommitted uint64
|
||||
}{
|
||||
{
|
||||
newNetwork(nil, nil, nil),
|
||||
|
@ -202,9 +202,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDuelingCandidates(t *testing.T) {
|
||||
a := newRaft(-1, nil, 10, 1) // k, id are set later
|
||||
b := newRaft(-1, nil, 10, 1)
|
||||
c := newRaft(-1, nil, 10, 1)
|
||||
a := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
b := newRaft(2, []uint64{1, 2, 3}, 10, 1)
|
||||
c := newRaft(3, []uint64{1, 2, 3}, 10, 1)
|
||||
|
||||
nt := newNetwork(a, b, c)
|
||||
nt.cut(1, 3)
|
||||
|
@ -219,7 +219,7 @@ func TestDuelingCandidates(t *testing.T) {
|
|||
tests := []struct {
|
||||
sm *raft
|
||||
state StateType
|
||||
term int64
|
||||
term uint64
|
||||
raftLog *raftLog
|
||||
}{
|
||||
{a, StateFollower, 2, wlog},
|
||||
|
@ -235,7 +235,7 @@ func TestDuelingCandidates(t *testing.T) {
|
|||
t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
|
||||
}
|
||||
base := ltoa(tt.raftLog)
|
||||
if sm, ok := nt.peers[1+int64(i)].(*raft); ok {
|
||||
if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
|
||||
l := ltoa(sm.raftLog)
|
||||
if g := diffu(base, l); g != "" {
|
||||
t.Errorf("#%d: diff:\n%s", i, g)
|
||||
|
@ -411,15 +411,15 @@ func TestProposalByProxy(t *testing.T) {
|
|||
|
||||
func TestCompact(t *testing.T) {
|
||||
tests := []struct {
|
||||
compacti int64
|
||||
nodes []int64
|
||||
removed []int64
|
||||
compacti uint64
|
||||
nodes []uint64
|
||||
removed []uint64
|
||||
snapd []byte
|
||||
wpanic bool
|
||||
}{
|
||||
{1, []int64{1, 2, 3}, []int64{4, 5}, []byte("some data"), false},
|
||||
{2, []int64{1, 2, 3}, []int64{4, 5}, []byte("some data"), false},
|
||||
{4, []int64{1, 2, 3}, []int64{4, 5}, []byte("some data"), true}, // compact out of range
|
||||
{1, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), false},
|
||||
{2, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), false},
|
||||
{4, []uint64{1, 2, 3}, []uint64{4, 5}, []byte("some data"), true}, // compact out of range
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
|
@ -438,14 +438,14 @@ func TestCompact(t *testing.T) {
|
|||
applied: 2,
|
||||
ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
|
||||
},
|
||||
removed: make(map[int64]bool),
|
||||
removed: make(map[uint64]bool),
|
||||
}
|
||||
for _, r := range tt.removed {
|
||||
sm.removeNode(r)
|
||||
}
|
||||
sm.compact(tt.compacti, tt.nodes, tt.snapd)
|
||||
sort.Sort(int64Slice(sm.raftLog.snapshot.Nodes))
|
||||
sort.Sort(int64Slice(sm.raftLog.snapshot.RemovedNodes))
|
||||
sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
|
||||
sort.Sort(uint64Slice(sm.raftLog.snapshot.RemovedNodes))
|
||||
if sm.raftLog.offset != tt.compacti {
|
||||
t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
|
||||
}
|
||||
|
@ -464,36 +464,36 @@ func TestCompact(t *testing.T) {
|
|||
|
||||
func TestCommit(t *testing.T) {
|
||||
tests := []struct {
|
||||
matches []int64
|
||||
matches []uint64
|
||||
logs []pb.Entry
|
||||
smTerm int64
|
||||
w int64
|
||||
smTerm uint64
|
||||
w uint64
|
||||
}{
|
||||
// single
|
||||
{[]int64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
|
||||
{[]int64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
|
||||
{[]int64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
|
||||
{[]int64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
|
||||
{[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
|
||||
{[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
|
||||
{[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
|
||||
|
||||
// odd
|
||||
{[]int64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
|
||||
{[]int64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]int64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
|
||||
{[]int64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
|
||||
{[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
|
||||
{[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
|
||||
// even
|
||||
{[]int64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
|
||||
{[]int64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]int64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
|
||||
{[]int64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]int64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
|
||||
{[]int64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
|
||||
{[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
|
||||
{[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
|
||||
{[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
prs := make(map[int64]*progress)
|
||||
prs := make(map[uint64]*progress)
|
||||
for j := 0; j < len(tt.matches); j++ {
|
||||
prs[int64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
|
||||
prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
|
||||
}
|
||||
sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}}
|
||||
sm.maybeCommit()
|
||||
|
@ -517,7 +517,7 @@ func TestIsElectionTimeout(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []int64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
sm.elapsed = tt.elapse
|
||||
c := 0
|
||||
for j := 0; j < 10000; j++ {
|
||||
|
@ -542,7 +542,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) {
|
|||
fakeStep := func(r *raft, m pb.Message) {
|
||||
called = true
|
||||
}
|
||||
sm := newRaft(1, []int64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
sm.step = fakeStep
|
||||
sm.Term = 2
|
||||
sm.Step(pb.Message{Type: msgApp, Term: sm.Term - 1})
|
||||
|
@ -559,8 +559,8 @@ func TestStepIgnoreOldTermMsg(t *testing.T) {
|
|||
func TestHandleMsgApp(t *testing.T) {
|
||||
tests := []struct {
|
||||
m pb.Message
|
||||
wIndex int64
|
||||
wCommit int64
|
||||
wIndex uint64
|
||||
wCommit uint64
|
||||
wReject bool
|
||||
}{
|
||||
// Ensure 1
|
||||
|
@ -608,8 +608,8 @@ func TestHandleMsgApp(t *testing.T) {
|
|||
func TestRecvMsgVote(t *testing.T) {
|
||||
tests := []struct {
|
||||
state StateType
|
||||
i, term int64
|
||||
voteFor int64
|
||||
i, term uint64
|
||||
voteFor uint64
|
||||
wreject bool
|
||||
}{
|
||||
{StateFollower, 0, 0, None, true},
|
||||
|
@ -640,7 +640,7 @@ func TestRecvMsgVote(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []int64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
sm.state = tt.state
|
||||
switch tt.state {
|
||||
case StateFollower:
|
||||
|
@ -671,12 +671,12 @@ func TestStateTransition(t *testing.T) {
|
|||
from StateType
|
||||
to StateType
|
||||
wallow bool
|
||||
wterm int64
|
||||
wlead int64
|
||||
wterm uint64
|
||||
wlead uint64
|
||||
}{
|
||||
{StateFollower, StateFollower, true, 1, None},
|
||||
{StateFollower, StateCandidate, true, 1, None},
|
||||
{StateFollower, StateLeader, false, -1, None},
|
||||
{StateFollower, StateLeader, false, 0, None},
|
||||
|
||||
{StateCandidate, StateFollower, true, 0, None},
|
||||
{StateCandidate, StateCandidate, true, 1, None},
|
||||
|
@ -697,7 +697,7 @@ func TestStateTransition(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
sm := newRaft(1, []int64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
sm.state = tt.from
|
||||
|
||||
switch tt.to {
|
||||
|
@ -724,19 +724,19 @@ func TestAllServerStepdown(t *testing.T) {
|
|||
state StateType
|
||||
|
||||
wstate StateType
|
||||
wterm int64
|
||||
windex int64
|
||||
wterm uint64
|
||||
windex uint64
|
||||
}{
|
||||
{StateFollower, StateFollower, 3, 1},
|
||||
{StateCandidate, StateFollower, 3, 1},
|
||||
{StateLeader, StateFollower, 3, 2},
|
||||
}
|
||||
|
||||
tmsgTypes := [...]int64{msgVote, msgApp}
|
||||
tterm := int64(3)
|
||||
tmsgTypes := [...]uint64{msgVote, msgApp}
|
||||
tterm := uint64(3)
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []int64{1, 2, 3}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
switch tt.state {
|
||||
case StateFollower:
|
||||
sm.becomeFollower(1, None)
|
||||
|
@ -756,10 +756,10 @@ func TestAllServerStepdown(t *testing.T) {
|
|||
if sm.Term != tt.wterm {
|
||||
t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
|
||||
}
|
||||
if int64(len(sm.raftLog.ents)) != tt.windex {
|
||||
if uint64(len(sm.raftLog.ents)) != tt.windex {
|
||||
t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.ents), tt.windex)
|
||||
}
|
||||
wlead := int64(2)
|
||||
wlead := uint64(2)
|
||||
if msgType == msgVote {
|
||||
wlead = None
|
||||
}
|
||||
|
@ -772,11 +772,11 @@ func TestAllServerStepdown(t *testing.T) {
|
|||
|
||||
func TestLeaderAppResp(t *testing.T) {
|
||||
tests := []struct {
|
||||
index int64
|
||||
index uint64
|
||||
reject bool
|
||||
wmsgNum int
|
||||
windex int64
|
||||
wcommitted int64
|
||||
windex uint64
|
||||
wcommitted uint64
|
||||
}{
|
||||
{3, true, 0, 0, 0}, // stale resp; no replies
|
||||
{2, true, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg
|
||||
|
@ -786,7 +786,7 @@ func TestLeaderAppResp(t *testing.T) {
|
|||
for i, tt := range tests {
|
||||
// sm term is 1 after it becomes the leader.
|
||||
// thus the last log term must be 1 to be committed.
|
||||
sm := newRaft(1, []int64{1, 2, 3}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
|
@ -811,14 +811,14 @@ func TestLeaderAppResp(t *testing.T) {
|
|||
// When the leader receives a heartbeat tick, it should
|
||||
// send a msgApp with m.Index = 0, m.LogTerm=0 and empty entries.
|
||||
func TestBcastBeat(t *testing.T) {
|
||||
offset := int64(1000)
|
||||
offset := uint64(1000)
|
||||
// make a state machine with log.offset = 1000
|
||||
s := pb.Snapshot{
|
||||
Index: offset,
|
||||
Term: 1,
|
||||
Nodes: []int64{1, 2, 3},
|
||||
Nodes: []uint64{1, 2, 3},
|
||||
}
|
||||
sm := newRaft(1, []int64{1, 2, 3}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
sm.Term = 1
|
||||
sm.restore(s)
|
||||
|
||||
|
@ -833,7 +833,7 @@ func TestBcastBeat(t *testing.T) {
|
|||
if len(msgs) != 2 {
|
||||
t.Fatalf("len(msgs) = %v, want 1", len(msgs))
|
||||
}
|
||||
tomap := map[int64]bool{2: true, 3: true}
|
||||
tomap := map[uint64]bool{2: true, 3: true}
|
||||
for i, m := range msgs {
|
||||
if m.Type != msgApp {
|
||||
t.Fatalf("#%d: type = %v, want = %v", i, m.Type, msgApp)
|
||||
|
@ -868,7 +868,7 @@ func TestRecvMsgBeat(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []int64{1, 2, 3}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
|
||||
sm.Term = 1
|
||||
sm.state = tt.state
|
||||
|
@ -898,11 +898,11 @@ func TestRestore(t *testing.T) {
|
|||
s := pb.Snapshot{
|
||||
Index: defaultCompactThreshold + 1,
|
||||
Term: defaultCompactThreshold + 1,
|
||||
Nodes: []int64{1, 2, 3},
|
||||
RemovedNodes: []int64{4, 5},
|
||||
Nodes: []uint64{1, 2, 3},
|
||||
RemovedNodes: []uint64{4, 5},
|
||||
}
|
||||
|
||||
sm := newRaft(1, []int64{1, 2}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
if ok := sm.restore(s); !ok {
|
||||
t.Fatal("restore fail, want succeed")
|
||||
}
|
||||
|
@ -915,8 +915,8 @@ func TestRestore(t *testing.T) {
|
|||
}
|
||||
sg := sm.nodes()
|
||||
srn := sm.removedNodes()
|
||||
sort.Sort(int64Slice(sg))
|
||||
sort.Sort(int64Slice(srn))
|
||||
sort.Sort(uint64Slice(sg))
|
||||
sort.Sort(uint64Slice(srn))
|
||||
if !reflect.DeepEqual(sg, s.Nodes) {
|
||||
t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
|
||||
}
|
||||
|
@ -936,9 +936,9 @@ func TestProvideSnap(t *testing.T) {
|
|||
s := pb.Snapshot{
|
||||
Index: defaultCompactThreshold + 1,
|
||||
Term: defaultCompactThreshold + 1,
|
||||
Nodes: []int64{1, 2},
|
||||
Nodes: []uint64{1, 2},
|
||||
}
|
||||
sm := newRaft(1, []int64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
// restore the statemachin from a snapshot
|
||||
// so it has a compacted log and a snapshot
|
||||
sm.restore(s)
|
||||
|
@ -965,11 +965,11 @@ func TestRestoreFromSnapMsg(t *testing.T) {
|
|||
s := pb.Snapshot{
|
||||
Index: defaultCompactThreshold + 1,
|
||||
Term: defaultCompactThreshold + 1,
|
||||
Nodes: []int64{1, 2},
|
||||
Nodes: []uint64{1, 2},
|
||||
}
|
||||
m := pb.Message{Type: msgSnap, From: 1, Term: 2, Snapshot: s}
|
||||
|
||||
sm := newRaft(2, []int64{1, 2}, 10, 1)
|
||||
sm := newRaft(2, []uint64{1, 2}, 10, 1)
|
||||
sm.Step(m)
|
||||
|
||||
if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
|
||||
|
@ -1008,7 +1008,7 @@ func TestSlowNodeRestore(t *testing.T) {
|
|||
// it appends the entry to log and sets pendingConf to be true.
|
||||
func TestStepConfig(t *testing.T) {
|
||||
// a raft that cannot make progress
|
||||
r := newRaft(1, []int64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
index := r.raftLog.lastIndex()
|
||||
|
@ -1026,7 +1026,7 @@ func TestStepConfig(t *testing.T) {
|
|||
// the proposal and keep its original state.
|
||||
func TestStepIgnoreConfig(t *testing.T) {
|
||||
// a raft that cannot make progress
|
||||
r := newRaft(1, []int64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
|
||||
|
@ -1052,7 +1052,7 @@ func TestRecoverPendingConfig(t *testing.T) {
|
|||
{pb.EntryConfChange, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []int64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r.appendEntry(pb.Entry{Type: tt.entType})
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
|
@ -1071,7 +1071,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
|||
t.Errorf("expect panic, but nothing happens")
|
||||
}
|
||||
}()
|
||||
r := newRaft(1, []int64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
||||
r.becomeCandidate()
|
||||
|
@ -1081,15 +1081,15 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
|||
|
||||
// TestAddNode tests that addNode could update pendingConf and nodes correctly.
|
||||
func TestAddNode(t *testing.T) {
|
||||
r := newRaft(1, []int64{1}, 10, 1)
|
||||
r := newRaft(1, []uint64{1}, 10, 1)
|
||||
r.pendingConf = true
|
||||
r.addNode(2)
|
||||
if r.pendingConf != false {
|
||||
t.Errorf("pendingConf = %v, want false", r.pendingConf)
|
||||
}
|
||||
nodes := r.nodes()
|
||||
sort.Sort(int64Slice(nodes))
|
||||
wnodes := []int64{1, 2}
|
||||
sort.Sort(uint64Slice(nodes))
|
||||
wnodes := []uint64{1, 2}
|
||||
if !reflect.DeepEqual(nodes, wnodes) {
|
||||
t.Errorf("nodes = %v, want %v", nodes, wnodes)
|
||||
}
|
||||
|
@ -1098,17 +1098,17 @@ func TestAddNode(t *testing.T) {
|
|||
// TestRemoveNode tests that removeNode could update pendingConf, nodes and
|
||||
// and removed list correctly.
|
||||
func TestRemoveNode(t *testing.T) {
|
||||
r := newRaft(1, []int64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r.pendingConf = true
|
||||
r.removeNode(2)
|
||||
if r.pendingConf != false {
|
||||
t.Errorf("pendingConf = %v, want false", r.pendingConf)
|
||||
}
|
||||
w := []int64{1}
|
||||
w := []uint64{1}
|
||||
if g := r.nodes(); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("nodes = %v, want %v", g, w)
|
||||
}
|
||||
wremoved := map[int64]bool{2: true}
|
||||
wremoved := map[uint64]bool{2: true}
|
||||
if !reflect.DeepEqual(r.removed, wremoved) {
|
||||
t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
|
||||
}
|
||||
|
@ -1121,13 +1121,13 @@ func TestRecvMsgDenied(t *testing.T) {
|
|||
fakeStep := func(r *raft, m pb.Message) {
|
||||
called = true
|
||||
}
|
||||
r := newRaft(1, []int64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r.step = fakeStep
|
||||
r.Step(pb.Message{From: 2, Type: msgDenied})
|
||||
if called != false {
|
||||
t.Errorf("stepFunc called = %v , want %v", called, false)
|
||||
}
|
||||
wremoved := map[int64]bool{1: true}
|
||||
wremoved := map[uint64]bool{1: true}
|
||||
if !reflect.DeepEqual(r.removed, wremoved) {
|
||||
t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
|
||||
}
|
||||
|
@ -1138,7 +1138,7 @@ func TestRecvMsgDenied(t *testing.T) {
|
|||
// pass it to the actual stepX function.
|
||||
func TestRecvMsgFromRemovedNode(t *testing.T) {
|
||||
tests := []struct {
|
||||
from int64
|
||||
from uint64
|
||||
wmsgNum int
|
||||
}{
|
||||
{1, 0},
|
||||
|
@ -1149,7 +1149,7 @@ func TestRecvMsgFromRemovedNode(t *testing.T) {
|
|||
fakeStep := func(r *raft, m pb.Message) {
|
||||
called = true
|
||||
}
|
||||
r := newRaft(1, []int64{1}, 10, 1)
|
||||
r := newRaft(1, []uint64{1}, 10, 1)
|
||||
r.step = fakeStep
|
||||
r.removeNode(tt.from)
|
||||
r.Step(pb.Message{From: tt.from, Type: msgVote})
|
||||
|
@ -1168,18 +1168,18 @@ func TestRecvMsgFromRemovedNode(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestPromotable(t *testing.T) {
|
||||
id := int64(1)
|
||||
id := uint64(1)
|
||||
tests := []struct {
|
||||
peers []int64
|
||||
peers []uint64
|
||||
wp bool
|
||||
}{
|
||||
{[]int64{1}, true},
|
||||
{[]int64{1, 2, 3}, true},
|
||||
{[]int64{}, false},
|
||||
{[]int64{2, 3}, false},
|
||||
{[]uint64{1}, true},
|
||||
{[]uint64{1, 2, 3}, true},
|
||||
{[]uint64{}, false},
|
||||
{[]uint64{2, 3}, false},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := &raft{id: id, prs: make(map[int64]*progress)}
|
||||
r := &raft{id: id, prs: make(map[uint64]*progress)}
|
||||
for _, id := range tt.peers {
|
||||
r.prs[id] = &progress{}
|
||||
}
|
||||
|
@ -1189,7 +1189,7 @@ func TestPromotable(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func ents(terms ...int64) *raft {
|
||||
func ents(terms ...uint64) *raft {
|
||||
ents := []pb.Entry{{}}
|
||||
for _, term := range terms {
|
||||
ents = append(ents, pb.Entry{Term: term})
|
||||
|
@ -1201,9 +1201,9 @@ func ents(terms ...int64) *raft {
|
|||
}
|
||||
|
||||
type network struct {
|
||||
peers map[int64]Interface
|
||||
peers map[uint64]Interface
|
||||
dropm map[connem]float64
|
||||
ignorem map[int64]bool
|
||||
ignorem map[uint64]bool
|
||||
}
|
||||
|
||||
// newNetwork initializes a network from peers.
|
||||
|
@ -1212,12 +1212,12 @@ type network struct {
|
|||
// When using stateMachine, the address list is always [0, n).
|
||||
func newNetwork(peers ...Interface) *network {
|
||||
size := len(peers)
|
||||
peerAddrs := make([]int64, size)
|
||||
peerAddrs := make([]uint64, size)
|
||||
for i := 0; i < size; i++ {
|
||||
peerAddrs[i] = 1 + int64(i)
|
||||
peerAddrs[i] = 1 + uint64(i)
|
||||
}
|
||||
|
||||
npeers := make(map[int64]Interface, size)
|
||||
npeers := make(map[uint64]Interface, size)
|
||||
|
||||
for i, p := range peers {
|
||||
id := peerAddrs[i]
|
||||
|
@ -1227,7 +1227,7 @@ func newNetwork(peers ...Interface) *network {
|
|||
npeers[id] = sm
|
||||
case *raft:
|
||||
v.id = id
|
||||
v.prs = make(map[int64]*progress)
|
||||
v.prs = make(map[uint64]*progress)
|
||||
for i := 0; i < size; i++ {
|
||||
v.prs[peerAddrs[i]] = &progress{}
|
||||
}
|
||||
|
@ -1242,7 +1242,7 @@ func newNetwork(peers ...Interface) *network {
|
|||
return &network{
|
||||
peers: npeers,
|
||||
dropm: make(map[connem]float64),
|
||||
ignorem: make(map[int64]bool),
|
||||
ignorem: make(map[uint64]bool),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1255,18 +1255,18 @@ func (nw *network) send(msgs ...pb.Message) {
|
|||
}
|
||||
}
|
||||
|
||||
func (nw *network) drop(from, to int64, perc float64) {
|
||||
func (nw *network) drop(from, to uint64, perc float64) {
|
||||
nw.dropm[connem{from, to}] = perc
|
||||
}
|
||||
|
||||
func (nw *network) cut(one, other int64) {
|
||||
func (nw *network) cut(one, other uint64) {
|
||||
nw.drop(one, other, 1)
|
||||
nw.drop(other, one, 1)
|
||||
}
|
||||
|
||||
func (nw *network) isolate(id int64) {
|
||||
func (nw *network) isolate(id uint64) {
|
||||
for i := 0; i < len(nw.peers); i++ {
|
||||
nid := int64(i) + 1
|
||||
nid := uint64(i) + 1
|
||||
if nid != id {
|
||||
nw.drop(id, nid, 1.0)
|
||||
nw.drop(nid, id, 1.0)
|
||||
|
@ -1274,13 +1274,13 @@ func (nw *network) isolate(id int64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (nw *network) ignore(t int64) {
|
||||
func (nw *network) ignore(t uint64) {
|
||||
nw.ignorem[t] = true
|
||||
}
|
||||
|
||||
func (nw *network) recover() {
|
||||
nw.dropm = make(map[connem]float64)
|
||||
nw.ignorem = make(map[int64]bool)
|
||||
nw.ignorem = make(map[uint64]bool)
|
||||
}
|
||||
|
||||
func (nw *network) filter(msgs []pb.Message) []pb.Message {
|
||||
|
@ -1305,7 +1305,7 @@ func (nw *network) filter(msgs []pb.Message) []pb.Message {
|
|||
}
|
||||
|
||||
type connem struct {
|
||||
from, to int64
|
||||
from, to uint64
|
||||
}
|
||||
|
||||
type blackHole struct{}
|
||||
|
|
|
@ -99,7 +99,7 @@ func (x *ConfChangeType) UnmarshalJSON(data []byte) error {
|
|||
}
|
||||
|
||||
type Info struct {
|
||||
ID int64 `protobuf:"varint,1,req" json:"ID"`
|
||||
ID uint64 `protobuf:"varint,1,req" json:"ID"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -109,8 +109,8 @@ func (*Info) ProtoMessage() {}
|
|||
|
||||
type Entry struct {
|
||||
Type EntryType `protobuf:"varint,1,req,enum=raftpb.EntryType" json:"Type"`
|
||||
Term int64 `protobuf:"varint,2,req" json:"Term"`
|
||||
Index int64 `protobuf:"varint,3,req" json:"Index"`
|
||||
Term uint64 `protobuf:"varint,2,req" json:"Term"`
|
||||
Index uint64 `protobuf:"varint,3,req" json:"Index"`
|
||||
Data []byte `protobuf:"bytes,4,opt" json:"Data"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
@ -120,12 +120,12 @@ func (m *Entry) String() string { return proto.CompactTextString(m) }
|
|||
func (*Entry) ProtoMessage() {}
|
||||
|
||||
type Snapshot struct {
|
||||
Data []byte `protobuf:"bytes,1,req,name=data" json:"data"`
|
||||
Nodes []int64 `protobuf:"varint,2,rep,name=nodes" json:"nodes"`
|
||||
Index int64 `protobuf:"varint,3,req,name=index" json:"index"`
|
||||
Term int64 `protobuf:"varint,4,req,name=term" json:"term"`
|
||||
RemovedNodes []int64 `protobuf:"varint,5,rep,name=removed_nodes" json:"removed_nodes"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
Data []byte `protobuf:"bytes,1,req,name=data" json:"data"`
|
||||
Nodes []uint64 `protobuf:"varint,2,rep,name=nodes" json:"nodes"`
|
||||
Index uint64 `protobuf:"varint,3,req,name=index" json:"index"`
|
||||
Term uint64 `protobuf:"varint,4,req,name=term" json:"term"`
|
||||
RemovedNodes []uint64 `protobuf:"varint,5,rep,name=removed_nodes" json:"removed_nodes"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Snapshot) Reset() { *m = Snapshot{} }
|
||||
|
@ -133,14 +133,14 @@ func (m *Snapshot) String() string { return proto.CompactTextString(m) }
|
|||
func (*Snapshot) ProtoMessage() {}
|
||||
|
||||
type Message struct {
|
||||
Type int64 `protobuf:"varint,1,req,name=type" json:"type"`
|
||||
To int64 `protobuf:"varint,2,req,name=to" json:"to"`
|
||||
From int64 `protobuf:"varint,3,req,name=from" json:"from"`
|
||||
Term int64 `protobuf:"varint,4,req,name=term" json:"term"`
|
||||
LogTerm int64 `protobuf:"varint,5,req,name=logTerm" json:"logTerm"`
|
||||
Index int64 `protobuf:"varint,6,req,name=index" json:"index"`
|
||||
Type uint64 `protobuf:"varint,1,req,name=type" json:"type"`
|
||||
To uint64 `protobuf:"varint,2,req,name=to" json:"to"`
|
||||
From uint64 `protobuf:"varint,3,req,name=from" json:"from"`
|
||||
Term uint64 `protobuf:"varint,4,req,name=term" json:"term"`
|
||||
LogTerm uint64 `protobuf:"varint,5,req,name=logTerm" json:"logTerm"`
|
||||
Index uint64 `protobuf:"varint,6,req,name=index" json:"index"`
|
||||
Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
|
||||
Commit int64 `protobuf:"varint,8,req,name=commit" json:"commit"`
|
||||
Commit uint64 `protobuf:"varint,8,req,name=commit" json:"commit"`
|
||||
Snapshot Snapshot `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"`
|
||||
Reject bool `protobuf:"varint,10,req,name=reject" json:"reject"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
|
@ -151,9 +151,9 @@ func (m *Message) String() string { return proto.CompactTextString(m) }
|
|||
func (*Message) ProtoMessage() {}
|
||||
|
||||
type HardState struct {
|
||||
Term int64 `protobuf:"varint,1,req,name=term" json:"term"`
|
||||
Vote int64 `protobuf:"varint,2,req,name=vote" json:"vote"`
|
||||
Commit int64 `protobuf:"varint,3,req,name=commit" json:"commit"`
|
||||
Term uint64 `protobuf:"varint,1,req,name=term" json:"term"`
|
||||
Vote uint64 `protobuf:"varint,2,req,name=vote" json:"vote"`
|
||||
Commit uint64 `protobuf:"varint,3,req,name=commit" json:"commit"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -162,9 +162,9 @@ func (m *HardState) String() string { return proto.CompactTextString(m) }
|
|||
func (*HardState) ProtoMessage() {}
|
||||
|
||||
type ConfChange struct {
|
||||
ID int64 `protobuf:"varint,1,req" json:"ID"`
|
||||
ID uint64 `protobuf:"varint,1,req" json:"ID"`
|
||||
Type ConfChangeType `protobuf:"varint,2,req,enum=raftpb.ConfChangeType" json:"Type"`
|
||||
NodeID int64 `protobuf:"varint,3,req" json:"NodeID"`
|
||||
NodeID uint64 `protobuf:"varint,3,req" json:"NodeID"`
|
||||
Context []byte `protobuf:"bytes,4,opt" json:"Context"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
@ -206,7 +206,7 @@ func (m *Info) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.ID |= (int64(b) & 0x7F) << shift
|
||||
m.ID |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -278,7 +278,7 @@ func (m *Entry) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Term |= (int64(b) & 0x7F) << shift
|
||||
m.Term |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -293,7 +293,7 @@ func (m *Entry) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Index |= (int64(b) & 0x7F) << shift
|
||||
m.Index |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -388,14 +388,14 @@ func (m *Snapshot) Unmarshal(data []byte) error {
|
|||
if wireType != 0 {
|
||||
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
|
||||
}
|
||||
var v int64
|
||||
var v uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if index >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[index]
|
||||
index++
|
||||
v |= (int64(b) & 0x7F) << shift
|
||||
v |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -411,7 +411,7 @@ func (m *Snapshot) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Index |= (int64(b) & 0x7F) << shift
|
||||
m.Index |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -426,7 +426,7 @@ func (m *Snapshot) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Term |= (int64(b) & 0x7F) << shift
|
||||
m.Term |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -435,14 +435,14 @@ func (m *Snapshot) Unmarshal(data []byte) error {
|
|||
if wireType != 0 {
|
||||
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
|
||||
}
|
||||
var v int64
|
||||
var v uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if index >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[index]
|
||||
index++
|
||||
v |= (int64(b) & 0x7F) << shift
|
||||
v |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -500,7 +500,7 @@ func (m *Message) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Type |= (int64(b) & 0x7F) << shift
|
||||
m.Type |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -515,7 +515,7 @@ func (m *Message) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.To |= (int64(b) & 0x7F) << shift
|
||||
m.To |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -530,7 +530,7 @@ func (m *Message) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.From |= (int64(b) & 0x7F) << shift
|
||||
m.From |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -545,7 +545,7 @@ func (m *Message) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Term |= (int64(b) & 0x7F) << shift
|
||||
m.Term |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -560,7 +560,7 @@ func (m *Message) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.LogTerm |= (int64(b) & 0x7F) << shift
|
||||
m.LogTerm |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -575,7 +575,7 @@ func (m *Message) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Index |= (int64(b) & 0x7F) << shift
|
||||
m.Index |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -613,7 +613,7 @@ func (m *Message) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Commit |= (int64(b) & 0x7F) << shift
|
||||
m.Commit |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -711,7 +711,7 @@ func (m *HardState) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Term |= (int64(b) & 0x7F) << shift
|
||||
m.Term |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -726,7 +726,7 @@ func (m *HardState) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Vote |= (int64(b) & 0x7F) << shift
|
||||
m.Vote |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -741,7 +741,7 @@ func (m *HardState) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Commit |= (int64(b) & 0x7F) << shift
|
||||
m.Commit |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -798,7 +798,7 @@ func (m *ConfChange) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.ID |= (int64(b) & 0x7F) << shift
|
||||
m.ID |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
@ -828,7 +828,7 @@ func (m *ConfChange) Unmarshal(data []byte) error {
|
|||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.NodeID |= (int64(b) & 0x7F) << shift
|
||||
m.NodeID |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ option (gogoproto.goproto_getters_all) = false;
|
|||
option (gogoproto.goproto_enum_prefix_all) = false;
|
||||
|
||||
message Info {
|
||||
required int64 ID = 1 [(gogoproto.nullable) = false];
|
||||
required uint64 ID = 1 [(gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
enum EntryType {
|
||||
|
@ -18,37 +18,37 @@ enum EntryType {
|
|||
}
|
||||
|
||||
message Entry {
|
||||
required EntryType Type = 1 [(gogoproto.nullable) = false];
|
||||
required int64 Term = 2 [(gogoproto.nullable) = false];
|
||||
required int64 Index = 3 [(gogoproto.nullable) = false];
|
||||
optional bytes Data = 4 [(gogoproto.nullable) = false];
|
||||
required EntryType Type = 1 [(gogoproto.nullable) = false];
|
||||
required uint64 Term = 2 [(gogoproto.nullable) = false];
|
||||
required uint64 Index = 3 [(gogoproto.nullable) = false];
|
||||
optional bytes Data = 4 [(gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
message Snapshot {
|
||||
required bytes data = 1 [(gogoproto.nullable) = false];
|
||||
repeated int64 nodes = 2 [(gogoproto.nullable) = false];
|
||||
required int64 index = 3 [(gogoproto.nullable) = false];
|
||||
required int64 term = 4 [(gogoproto.nullable) = false];
|
||||
repeated int64 removed_nodes = 5 [(gogoproto.nullable) = false];
|
||||
repeated uint64 nodes = 2 [(gogoproto.nullable) = false];
|
||||
required uint64 index = 3 [(gogoproto.nullable) = false];
|
||||
required uint64 term = 4 [(gogoproto.nullable) = false];
|
||||
repeated uint64 removed_nodes = 5 [(gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
message Message {
|
||||
required int64 type = 1 [(gogoproto.nullable) = false];
|
||||
required int64 to = 2 [(gogoproto.nullable) = false];
|
||||
required int64 from = 3 [(gogoproto.nullable) = false];
|
||||
required int64 term = 4 [(gogoproto.nullable) = false];
|
||||
required int64 logTerm = 5 [(gogoproto.nullable) = false];
|
||||
required int64 index = 6 [(gogoproto.nullable) = false];
|
||||
repeated Entry entries = 7 [(gogoproto.nullable) = false];
|
||||
required int64 commit = 8 [(gogoproto.nullable) = false];
|
||||
required Snapshot snapshot = 9 [(gogoproto.nullable) = false];
|
||||
required bool reject = 10 [(gogoproto.nullable) = false];
|
||||
required uint64 type = 1 [(gogoproto.nullable) = false];
|
||||
required uint64 to = 2 [(gogoproto.nullable) = false];
|
||||
required uint64 from = 3 [(gogoproto.nullable) = false];
|
||||
required uint64 term = 4 [(gogoproto.nullable) = false];
|
||||
required uint64 logTerm = 5 [(gogoproto.nullable) = false];
|
||||
required uint64 index = 6 [(gogoproto.nullable) = false];
|
||||
repeated Entry entries = 7 [(gogoproto.nullable) = false];
|
||||
required uint64 commit = 8 [(gogoproto.nullable) = false];
|
||||
required Snapshot snapshot = 9 [(gogoproto.nullable) = false];
|
||||
required bool reject = 10 [(gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
message HardState {
|
||||
required int64 term = 1 [(gogoproto.nullable) = false];
|
||||
required int64 vote = 2 [(gogoproto.nullable) = false];
|
||||
required int64 commit = 3 [(gogoproto.nullable) = false];
|
||||
required uint64 term = 1 [(gogoproto.nullable) = false];
|
||||
required uint64 vote = 2 [(gogoproto.nullable) = false];
|
||||
required uint64 commit = 3 [(gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
enum ConfChangeType {
|
||||
|
@ -57,8 +57,8 @@ enum ConfChangeType {
|
|||
}
|
||||
|
||||
message ConfChange {
|
||||
required int64 ID = 1 [(gogoproto.nullable) = false];
|
||||
required ConfChangeType Type = 2 [(gogoproto.nullable) = false];
|
||||
required int64 NodeID = 3 [(gogoproto.nullable) = false];
|
||||
optional bytes Context = 4 [(gogoproto.nullable) = false];
|
||||
required uint64 ID = 1 [(gogoproto.nullable) = false];
|
||||
required ConfChangeType Type = 2 [(gogoproto.nullable) = false];
|
||||
required uint64 NodeID = 3 [(gogoproto.nullable) = false];
|
||||
optional bytes Context = 4 [(gogoproto.nullable) = false];
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
|
||||
var testSnap = &raftpb.Snapshot{
|
||||
Data: []byte("some snapshot"),
|
||||
Nodes: []int64{1, 2, 3},
|
||||
Nodes: []uint64{1, 2, 3},
|
||||
Index: 1,
|
||||
Term: 1,
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ func Exist(dirpath string) bool {
|
|||
// searchIndex returns the last array index of names whose raft index section is
|
||||
// equal to or smaller than the given index.
|
||||
// The given names MUST be sorted.
|
||||
func searchIndex(names []string, index int64) (int, bool) {
|
||||
func searchIndex(names []string, index uint64) (int, bool) {
|
||||
for i := len(names) - 1; i >= 0; i-- {
|
||||
name := names[i]
|
||||
_, curIndex, err := parseWalName(name)
|
||||
|
@ -34,7 +34,7 @@ func searchIndex(names []string, index int64) (int, bool) {
|
|||
// names should have been sorted based on sequence number.
|
||||
// isValidSeq checks whether seq increases continuously.
|
||||
func isValidSeq(names []string) bool {
|
||||
var lastSeq int64
|
||||
var lastSeq uint64
|
||||
for _, name := range names {
|
||||
curSeq, _, err := parseWalName(name)
|
||||
if err != nil {
|
||||
|
@ -74,7 +74,7 @@ func checkWalNames(names []string) []string {
|
|||
return wnames
|
||||
}
|
||||
|
||||
func parseWalName(str string) (seq, index int64, err error) {
|
||||
func parseWalName(str string) (seq, index uint64, err error) {
|
||||
var num int
|
||||
num, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
|
||||
if num != 2 && err == nil {
|
||||
|
@ -83,7 +83,7 @@ func parseWalName(str string) (seq, index int64, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func walName(seq, index int64) string {
|
||||
func walName(seq, index uint64) string {
|
||||
return fmt.Sprintf("%016x-%016x.wal", seq, index)
|
||||
}
|
||||
|
||||
|
|
10
wal/wal.go
10
wal/wal.go
|
@ -56,12 +56,12 @@ var (
|
|||
type WAL struct {
|
||||
dir string // the living directory of the underlay files
|
||||
|
||||
ri int64 // index of entry to start reading
|
||||
ri uint64 // index of entry to start reading
|
||||
decoder *decoder // decoder to decode records
|
||||
|
||||
f *os.File // underlay file opened for appending, sync
|
||||
seq int64 // sequence of the wal file currently used for writes
|
||||
enti int64 // index of the last entry saved to the wal
|
||||
seq uint64 // sequence of the wal file currently used for writes
|
||||
enti uint64 // index of the last entry saved to the wal
|
||||
encoder *encoder // encoder to encode records
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,7 @@ func Create(dirpath string) (*WAL, error) {
|
|||
// The returned WAL is ready to read and the first record will be the given
|
||||
// index. The WAL cannot be appended to before reading out all of its
|
||||
// previous records.
|
||||
func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
|
||||
func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
|
||||
names, err := readDir(dirpath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -154,7 +154,7 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
|
|||
// ReadAll reads out all records of the current WAL.
|
||||
// If it cannot read out the expected entry, it will return ErrIndexNotFound.
|
||||
// After ReadAll, the WAL will be ready for appending new records.
|
||||
func (w *WAL) ReadAll() (id int64, state raftpb.HardState, ents []raftpb.Entry, err error) {
|
||||
func (w *WAL) ReadAll() (id uint64, state raftpb.HardState, ents []raftpb.Entry, err error) {
|
||||
rec := &walpb.Record{}
|
||||
decoder := w.decoder
|
||||
|
||||
|
|
|
@ -165,7 +165,7 @@ func TestRecover(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
i := &raftpb.Info{ID: int64(0xBAD0)}
|
||||
i := &raftpb.Info{ID: uint64(0xBAD0)}
|
||||
if err = w.SaveInfo(i); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -207,7 +207,7 @@ func TestRecover(t *testing.T) {
|
|||
func TestSearchIndex(t *testing.T) {
|
||||
tests := []struct {
|
||||
names []string
|
||||
index int64
|
||||
index uint64
|
||||
widx int
|
||||
wok bool
|
||||
}{
|
||||
|
@ -250,7 +250,7 @@ func TestSearchIndex(t *testing.T) {
|
|||
func TestScanWalName(t *testing.T) {
|
||||
tests := []struct {
|
||||
str string
|
||||
wseq, windex int64
|
||||
wseq, windex uint64
|
||||
wok bool
|
||||
}{
|
||||
{"0000000000000000-0000000000000000.wal", 0, 0, true},
|
||||
|
@ -282,7 +282,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
info := &raftpb.Info{ID: int64(0xBAD1)}
|
||||
info := &raftpb.Info{ID: uint64(0xBAD1)}
|
||||
if err = w.SaveInfo(info); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -294,7 +294,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
for i := 1; i < 10; i++ {
|
||||
e := raftpb.Entry{Index: int64(i)}
|
||||
e := raftpb.Entry{Index: uint64(i)}
|
||||
if err = w.SaveEntry(&e); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -312,7 +312,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
|||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
w, err := OpenAtIndex(p, int64(i))
|
||||
w, err := OpenAtIndex(p, uint64(i))
|
||||
if err != nil {
|
||||
if i <= 4 {
|
||||
if err != ErrFileNotFound {
|
||||
|
@ -332,7 +332,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
|||
t.Errorf("#%d: id = %d, want %d", i, id, info.ID)
|
||||
}
|
||||
for j, e := range entries {
|
||||
if e.Index != int64(j+i) {
|
||||
if e.Index != uint64(j+i) {
|
||||
t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue