*: add wal
parent
5ddfe18cda
commit
1a6e908971
40
main.go
40
main.go
|
@ -2,22 +2,26 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
|
||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
|
"github.com/coreos/etcd/wal"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
fid = flag.String("id", "0xBEEF", "Id of this server")
|
fid = flag.String("id", "0xBEEF", "Id of this server")
|
||||||
timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout")
|
timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout")
|
||||||
laddr = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')")
|
laddr = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')")
|
||||||
|
dir = flag.String("d", "", "Directory to store wal files and snapshot files")
|
||||||
|
|
||||||
peers = etcdhttp.Peers{}
|
peers = etcdhttp.Peers{}
|
||||||
)
|
)
|
||||||
|
@ -38,13 +42,43 @@ func main() {
|
||||||
log.Fatalf("%#x=<addr> must be specified in peers", id)
|
log.Fatalf("%#x=<addr> must be specified in peers", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
n := raft.Start(id, peers.Ids(), 10, 1)
|
if *dir == "" {
|
||||||
|
*dir = fmt.Sprintf("%v", *fid)
|
||||||
|
log.Printf("main: no data dir is given, use default data dir ./%s", *dir)
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(*dir, 0700); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
waldir := path.Join(*dir, "wal")
|
||||||
|
|
||||||
|
var w *wal.WAL
|
||||||
|
var n raft.Node
|
||||||
|
if wal.Exist(waldir) {
|
||||||
|
// TODO(xiangli): check snapshot; not open from zero
|
||||||
|
w, err = wal.OpenFromIndex(waldir, 0)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
// TODO(xiangli): save/recovery nodeID?
|
||||||
|
_, st, ents, err := w.ReadAll()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
n = raft.Restart(id, peers.Ids(), 10, 1, st, ents)
|
||||||
|
} else {
|
||||||
|
w, err = wal.Create(waldir)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
n = raft.Start(id, peers.Ids(), 10, 1)
|
||||||
|
}
|
||||||
|
|
||||||
tk := time.NewTicker(100 * time.Millisecond)
|
tk := time.NewTicker(100 * time.Millisecond)
|
||||||
s := &etcdserver.Server{
|
s := &etcdserver.Server{
|
||||||
Store: store.New(),
|
Store: store.New(),
|
||||||
Node: n,
|
Node: n,
|
||||||
Save: func(st raftpb.State, ents []raftpb.Entry) {}, // TODO: use wal
|
Save: w.Save,
|
||||||
Send: etcdhttp.Sender(peers),
|
Send: etcdhttp.Sender(peers),
|
||||||
Ticker: tk.C,
|
Ticker: tk.C,
|
||||||
}
|
}
|
||||||
|
|
24
raft/node.go
24
raft/node.go
|
@ -48,7 +48,23 @@ type Node struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(id int64, peers []int64, election, heartbeat int) Node {
|
func Start(id int64, peers []int64, election, heartbeat int) Node {
|
||||||
n := Node{
|
n := newNode()
|
||||||
|
r := newRaft(id, peers, election, heartbeat)
|
||||||
|
go n.run(r)
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func Restart(id int64, peers []int64, election, heartbeat int, st pb.State, ents []pb.Entry) Node {
|
||||||
|
n := newNode()
|
||||||
|
r := newRaft(id, peers, election, heartbeat)
|
||||||
|
r.loadState(st)
|
||||||
|
r.loadEnts(ents)
|
||||||
|
go n.run(r)
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNode() Node {
|
||||||
|
return Node{
|
||||||
propc: make(chan pb.Message),
|
propc: make(chan pb.Message),
|
||||||
recvc: make(chan pb.Message),
|
recvc: make(chan pb.Message),
|
||||||
readyc: make(chan Ready),
|
readyc: make(chan Ready),
|
||||||
|
@ -56,9 +72,6 @@ func Start(id int64, peers []int64, election, heartbeat int) Node {
|
||||||
alwaysreadyc: make(chan Ready),
|
alwaysreadyc: make(chan Ready),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
r := newRaft(id, peers, election, heartbeat)
|
|
||||||
go n.run(r)
|
|
||||||
return n
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) Stop() {
|
func (n *Node) Stop() {
|
||||||
|
@ -71,7 +84,8 @@ func (n *Node) run(r *raft) {
|
||||||
|
|
||||||
var lead int64
|
var lead int64
|
||||||
var prev Ready
|
var prev Ready
|
||||||
prev.Vote = none
|
prev.State = r.State
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if lead != r.lead {
|
if lead != r.lead {
|
||||||
log.Printf("raft: leader changed from %#x to %#x", lead, r.lead)
|
log.Printf("raft: leader changed from %#x to %#x", lead, r.lead)
|
||||||
|
|
|
@ -42,3 +42,28 @@ func TestNode(t *testing.T) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNodeRestart(t *testing.T) {
|
||||||
|
entries := []raftpb.Entry{
|
||||||
|
{Term: 1, Index: 1},
|
||||||
|
{Term: 1, Index: 2, Data: []byte("foo")},
|
||||||
|
}
|
||||||
|
st := raftpb.State{Term: 1, Vote: -1, Commit: 1, LastIndex: 2}
|
||||||
|
|
||||||
|
want := Ready{
|
||||||
|
State: raftpb.State{Term: 1, Vote: -1, Commit: 1, LastIndex: 2},
|
||||||
|
// commit upto index 1
|
||||||
|
CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}},
|
||||||
|
}
|
||||||
|
|
||||||
|
n := Restart(1, []int64{1}, 0, 0, st, entries)
|
||||||
|
if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
|
||||||
|
t.Errorf("g = %+v,\n w %+v", g, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case rd := <-n.Ready():
|
||||||
|
t.Errorf("unexpected Ready: %+v", rd)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -512,4 +512,6 @@ func (r *raft) loadState(state pb.State) {
|
||||||
r.raftLog.committed = state.Commit
|
r.raftLog.committed = state.Commit
|
||||||
r.Term = state.Term
|
r.Term = state.Term
|
||||||
r.Vote = state.Vote
|
r.Vote = state.Vote
|
||||||
|
r.Commit = state.Commit
|
||||||
|
r.LastIndex = state.LastIndex
|
||||||
}
|
}
|
||||||
|
|
14
wal/wal.go
14
wal/wal.go
|
@ -66,6 +66,11 @@ func Create(dirpath string) (*WAL, error) {
|
||||||
if Exist(dirpath) {
|
if Exist(dirpath) {
|
||||||
return nil, os.ErrExist
|
return nil, os.ErrExist
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := os.MkdirAll(dirpath, 0700); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0))
|
p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0))
|
||||||
f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -258,6 +263,15 @@ func (w *WAL) SaveState(s *raftpb.State) error {
|
||||||
return w.encoder.encode(rec)
|
return w.encoder.encode(rec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WAL) Save(st raftpb.State, ents []raftpb.Entry) {
|
||||||
|
// TODO(xiangli): no addresses fly around
|
||||||
|
w.SaveState(&st)
|
||||||
|
for i := range ents {
|
||||||
|
w.SaveEntry(&ents[i])
|
||||||
|
}
|
||||||
|
w.Sync()
|
||||||
|
}
|
||||||
|
|
||||||
func (w *WAL) saveCrc(prevCrc uint32) error {
|
func (w *WAL) saveCrc(prevCrc uint32) error {
|
||||||
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue