Merge pull request #4011 from heyitsanthony/raftexample

contrib: example key-value store using raft
release-2.3
Xiang Li 2015-12-17 15:05:59 -08:00
commit bff857dc28
7 changed files with 514 additions and 0 deletions

View File

@ -3,3 +3,4 @@
Scripts and files which may be useful but aren't part of the core etcd project.
- [systemd](systemd) - an example unit file for deploying etcd on systemd-based distributions
- [raftexample](raftexample) - an example distributed key-value store using raft

View File

@ -0,0 +1,4 @@
# Use goreman to run `go get github.com/mattn/goreman`
raftexample1: ./raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380
raftexample2: ./raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22380
raftexample3: ./raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32380

View File

@ -0,0 +1,90 @@
# raftexample
raftexample is an example usage of etcd's [raft library](../../raft). It provides a simple REST API for a key-value store cluster backed by the [Raft][raft] consensus algorithm.
[raft]: http://raftconsensus.github.io/
## Getting Started
### Running single node raftexample
First start a single-member cluster of raftexample:
```sh
raftexample --id 1 --cluster http://127.0.0.1:12379 --port 12380
```
Each raftexample process maintains a single raft instance and a key-value server.
The process's list of comma separated peers (--cluster), its raft ID index into the peer list (--id), and http key-value server port (--port) are passed through the command line.
Next, store a value ("hello") to a key ("my-key"):
```
curl -L http://127.0.0.1:12380/my-key -XPUT -d hello
```
Finally, retrieve the stored key:
```
curl -L http://127.0.0.1:12380/my-key
```
### Running a local cluster
First install [goreman](https://github.com/mattn/goreman), which manages Procfile-based applications.
The [Procfile script](./Procfile) will set up a local example cluster. You can start it with:
```sh
goreman start
```
This will bring up three raftexample instances.
You can write a key-value pair to any member of the cluster and likewise retrieve it from any member.
### Fault Tolerance
To test cluster recovery, first start a cluster and write a value "foo":
```sh
goreman start
curl -L http://127.0.0.1:12380/my-key -XPUT -d foo
```
Next, remove a node and replace the value with "bar" to check cluster availability:
```sh
goreman run stop raftexample2
curl -L http://127.0.0.1:12380/my-key -XPUT -d bar
curl -L http://127.0.0.1:32380/my-key
```
Finally, bring the node back up and verify it recovers with the updated value "bar":
```sh
goreman run start raftexample2
curl -L http://127.0.0.1:22380/my-key
```
## Design
The raftexample consists of three components: a raft-backed key-value store, a REST API server, and a raft consensus server based on etcd's raft implementation.
The raft-backed key-value store is a key-value map that holds all committed key-values.
The store bridges communication between the raft server and the REST server.
Key-value updates are issued through the store to the raft server.
The store updates its map once raft reports the updates are committed.
The REST server exposes the current raft consensus by accessing the raft-backed key-value store.
A GET command looks up a key in the store and returns the value, if any.
A key-value PUT command issues an update proposal to the store.
The raft server participates in consensus with its cluster peers.
When the REST server submits a proposal, the raft server transmits the proposal to its peers.
When raft reaches a consensus, the server publishes all committed updates over a commit channel.
For raftexample, this commit channel is consumed by the key-value store.
## Project Details
### TODO
- Snapshot support
- Dynamic reconfiguration

View File

@ -0,0 +1,67 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"io/ioutil"
"log"
"net/http"
"strconv"
)
// Handler for a http based key-value store backed by raft
type httpKVAPI struct {
store *kvstore
}
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
switch {
case r.Method == "PUT":
v, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
h.store.Propose(key, string(v))
// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
w.WriteHeader(http.StatusNoContent)
case r.Method == "GET":
if v, ok := h.store.Lookup(key); ok {
w.Write([]byte(v))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}
default:
w.Header().Set("Allow", "PUT")
w.Header().Add("Allow", "GET")
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(port int, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) {
srv := http.Server{
Addr: ":" + strconv.Itoa(port),
Handler: &httpKVAPI{newKVStore(proposeC, commitC, errorC)},
}
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}

View File

@ -0,0 +1,82 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"encoding/gob"
"log"
"sync"
)
// a key-value store backed by raft
type kvstore struct {
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current committed key-value pairs
}
type kv struct {
Key string
Val string
}
func newKVStore(proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {
s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string)}
// replay log into key-value map
s.readCommits(commitC, errorC)
// read commits from raft into kvStore map until error
go s.readCommits(commitC, errorC)
return s
}
func (s *kvstore) Lookup(key string) (string, bool) {
s.mu.RLock()
v, ok := s.kvStore[key]
s.mu.RUnlock()
return v, ok
}
func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
log.Fatal(err)
}
s.proposeC <- string(buf.Bytes())
}
func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
for {
select {
case data := <-commitC:
if data == nil {
// done replaying log; new data incoming
return
}
var data_kv kv
dec := gob.NewDecoder(bytes.NewBufferString(*data))
if err := dec.Decode(&data_kv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[data_kv.Key] = data_kv.Val
s.mu.Unlock()
case err := <-errorC:
log.Println(err)
return
}
}
}

View File

@ -0,0 +1,36 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"flag"
"strings"
)
func main() {
cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
flag.Parse()
proposeC := make(chan string)
defer close(proposeC)
// raft provides a commit stream for the proposals from the http api
commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), proposeC)
// the key-value http handler will propose updates to raft
serveHttpKVAPI(*kvport, proposeC, commitC, errorC)
}

234
contrib/raftexample/raft.go Normal file
View File

@ -0,0 +1,234 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"log"
"os"
"strconv"
"time"
"net/http"
"net/url"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
// A key-value stream backed by raft
type raftNode struct {
proposeC <-chan string // proposed messages (k,v)
commitC chan *string // entries committed to log (k,v)
errorC chan error // errors from raft session
id int // client ID for raft session
peers []string // raft peer URLs
waldir string // path to WAL directory
// raft backing for the commit/error channel
node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL
transport *rafthttp.Transport
}
// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries.
func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) {
rc := &raftNode{
proposeC: proposeC,
commitC: make(chan *string),
errorC: make(chan error),
id: id,
peers: peers,
waldir: fmt.Sprintf("raftexample-%d", id),
raftStorage: raft.NewMemoryStorage(),
// rest of structure populated after WAL replay
}
go rc.startRaft()
return rc.commitC, rc.errorC
}
// publishEntries writes committed log entries to commit channel.
func (rc *raftNode) publishEntries(ents []raftpb.Entry) {
for i := range ents {
if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 {
// ignore conf changes and empty messages
continue
}
s := string(ents[i].Data)
rc.commitC <- &s
}
}
// openWAL returns a WAL ready for reading.
func (rc *raftNode) openWAL() *wal.WAL {
if wal.Exist(rc.waldir) == false {
if err := os.Mkdir(rc.waldir, 0750); err != nil {
log.Fatalf("raftexample: cannot create dir for wal (%v)", err)
}
w, err := wal.Create(rc.waldir, nil)
if err != nil {
log.Fatalf("raftexample: create wal error (%v)", err)
}
w.Close()
}
w, err := wal.Open(rc.waldir, walpb.Snapshot{})
if err != nil {
log.Fatalf("raftexample: error loading wal (%v)", err)
}
return w
}
// replayWAL replays WAL entries into the raft instance and the commit
// channel and returns an appendable WAL.
func (rc *raftNode) replayWAL() *wal.WAL {
w := rc.openWAL()
_, _, ents, err := w.ReadAll()
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents)
rc.publishEntries(ents)
// send nil value so client knows commit channel is current
rc.commitC <- nil
return w
}
func (rc *raftNode) writeError(err error) {
rc.errorC <- err
rc.stop()
}
func (rc *raftNode) stop() {
close(rc.commitC)
close(rc.errorC)
rc.node.Stop()
}
func (rc *raftNode) startRaft() {
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL()
rpeers := make([]raft.Peer, len(rc.peers))
for i := range rpeers {
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
c := &raft.Config{
ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
}
if oldwal {
rc.node = raft.RestartNode(c)
} else {
rc.node = raft.StartNode(c, rpeers)
}
ss := &stats.ServerStats{}
ss.Initialize()
rc.transport = &rafthttp.Transport{
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: ss,
LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
rc.transport.Start()
for i := range rc.peers {
if i+1 != rc.id {
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
go rc.serveRaft()
go rc.serveChannels()
}
func (rc *raftNode) serveChannels() {
defer rc.wal.Close()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// event loop on client proposals and raft updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// send proposals over raft
case prop, ok := <-rc.proposeC:
if !ok {
// client closed channel; shut down
rc.stop()
return
}
rc.node.Propose(context.TODO(), []byte(prop))
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
rc.publishEntries(rd.Entries)
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
}
}
}
func (rc *raftNode) serveRaft() {
url, err := url.Parse(rc.peers[rc.id-1])
if err != nil {
log.Fatalf("raftexample: Failed parsing URL (%v)", err)
}
srv := http.Server{Addr: url.Host, Handler: rc.transport.Handler()}
if err := srv.ListenAndServe(); err != nil {
log.Fatalf("raftexample: Failed serving rafthttp (%v)", err)
}
}
func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
return rc.node.Step(ctx, m)
}
func (rc *raftNode) IsIDRemoved(id uint64) bool { return false }
func (rc *raftNode) ReportUnreachable(id uint64) {}
func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}