From 7c0b6d9be9702fec4b8147e48e9afc1148b9e244 Mon Sep 17 00:00:00 2001 From: Adam Wolfe Gordon Date: Fri, 5 Feb 2016 13:13:16 -0700 Subject: [PATCH] contrib/raftexample: Allow nodes to be added to a running cluster A node with ID n can be added by POSTing the new node's URL to /n on the HTTP server. --- contrib/raftexample/httpapi.go | 41 ++++++++++++++++++++++--- contrib/raftexample/main.go | 5 +-- contrib/raftexample/raft.go | 10 ++++-- contrib/raftexample/raftexample_test.go | 2 +- 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/contrib/raftexample/httpapi.go b/contrib/raftexample/httpapi.go index ff6fd1086..b8dd6e7a7 100644 --- a/contrib/raftexample/httpapi.go +++ b/contrib/raftexample/httpapi.go @@ -19,11 +19,14 @@ import ( "log" "net/http" "strconv" + + "github.com/coreos/etcd/raft/raftpb" ) // Handler for a http based key-value store backed by raft type httpKVAPI struct { - store *kvstore + store *kvstore + confChangeC chan<- raftpb.ConfChange } func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -48,18 +51,48 @@ func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { } else { http.Error(w, "Failed to GET", http.StatusNotFound) } + case r.Method == "POST": + url, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Printf("Failed to read on POST (%v)\n", err) + http.Error(w, "Failed on POST", http.StatusBadRequest) + return + } + + nodeId, err := strconv.ParseUint(key[1:], 0, 64) + if err != nil { + log.Printf("Failed to convert ID for conf change (%v)\n", err) + http.Error(w, "Failed on POST", http.StatusBadRequest) + return + } + + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: nodeId, + Context: url, + } + h.confChangeC <- cc + + // As above, optimistic that raft will apply the conf change + w.WriteHeader(http.StatusNoContent) default: w.Header().Set("Allow", "PUT") w.Header().Add("Allow", "GET") + w.Header().Add("Allow", "POST") http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } } // serveHttpKVAPI starts a key-value server with a GET/PUT API and listens. -func serveHttpKVAPI(port int, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) { +func serveHttpKVAPI(port int, proposeC chan<- string, confChangeC chan<- raftpb.ConfChange, + commitC <-chan *string, errorC <-chan error) { + srv := http.Server{ - Addr: ":" + strconv.Itoa(port), - Handler: &httpKVAPI{newKVStore(proposeC, commitC, errorC)}, + Addr: ":" + strconv.Itoa(port), + Handler: &httpKVAPI{ + store: newKVStore(proposeC, commitC, errorC), + confChangeC: confChangeC, + }, } if err := srv.ListenAndServe(); err != nil { log.Fatal(err) diff --git a/contrib/raftexample/main.go b/contrib/raftexample/main.go index 800d4a648..8dfdcbbfe 100644 --- a/contrib/raftexample/main.go +++ b/contrib/raftexample/main.go @@ -25,6 +25,7 @@ func main() { cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers") id := flag.Int("id", 1, "node ID") kvport := flag.Int("port", 9121, "key-value server port") + join := flag.Bool("join", false, "join an existing cluster") flag.Parse() proposeC := make(chan string) @@ -33,8 +34,8 @@ func main() { defer close(confChangeC) // raft provides a commit stream for the proposals from the http api - commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), proposeC, confChangeC) + commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), *join, proposeC, confChangeC) // the key-value http handler will propose updates to raft - serveHttpKVAPI(*kvport, proposeC, commitC, errorC) + serveHttpKVAPI(*kvport, proposeC, confChangeC, commitC, errorC) } diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index a52234fb3..736e9886b 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -43,6 +43,7 @@ type raftNode struct { id int // client ID for raft session peers []string // raft peer URLs + join bool // node is joining an existing cluster waldir string // path to WAL directory // raft backing for the commit/error channel @@ -60,7 +61,7 @@ type raftNode struct { // provided the proposal channel. All log entries are replayed over the // commit channel, followed by a nil message (to indicate the channel is // current), then new log entries. To shutdown, close proposeC and read errorC. -func newRaftNode(id int, peers []string, proposeC <-chan string, +func newRaftNode(id int, peers []string, join bool, proposeC <-chan string, confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error) { rc := &raftNode{ @@ -70,6 +71,7 @@ func newRaftNode(id int, peers []string, proposeC <-chan string, errorC: make(chan error), id: id, peers: peers, + join: join, waldir: fmt.Sprintf("raftexample-%d", id), raftStorage: raft.NewMemoryStorage(), stopc: make(chan struct{}), @@ -186,7 +188,11 @@ func (rc *raftNode) startRaft() { if oldwal { rc.node = raft.RestartNode(c) } else { - rc.node = raft.StartNode(c, rpeers) + startPeers := rpeers + if rc.join { + startPeers = nil + } + rc.node = raft.StartNode(c, startPeers) } ss := &stats.ServerStats{} diff --git a/contrib/raftexample/raftexample_test.go b/contrib/raftexample/raftexample_test.go index 4be310c1b..a4d34f02f 100644 --- a/contrib/raftexample/raftexample_test.go +++ b/contrib/raftexample/raftexample_test.go @@ -49,7 +49,7 @@ func newCluster(n int) *cluster { os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) clus.proposeC[i] = make(chan string, 1) clus.confChangeC[i] = make(chan raftpb.ConfChange, 1) - clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i], clus.confChangeC[i]) + clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, false, clus.proposeC[i], clus.confChangeC[i]) } return clus