redirect the join command and do not redirect other commands (let client does that)

release-0.4
Xiang Li 2013-06-20 15:26:31 -07:00
parent 30da72623f
commit 58e7b456bb
6 changed files with 232 additions and 304 deletions

View File

@ -7,20 +7,15 @@ package main
import (
"github.com/benbjohnson/go-raft"
"github.com/xiangli-cmu/raft-etcd/store"
"encoding/json"
"time"
"github.com/xiangli-cmu/raft-etcd/store"
)
// A command represents an action to be taken on the replicated state machine.
type Command interface {
CommandName() string
Apply(server *raft.Server) ([]byte, error)
GeneratePath() string // Gererate a path for http request
Type() string // http request type
GetValue() string
GetKey() string
Sensitive() bool // Sensitive to the stateMachine
}
// Set command
@ -45,23 +40,6 @@ func (c *SetCommand) GeneratePath() string {
return "set/" + c.Key
}
// Get the type for http request
func (c *SetCommand) Type() string {
return "POST"
}
func (c *SetCommand) GetValue() string {
return c.Value
}
func (c *SetCommand) GetKey() string {
return c.Key
}
func (c *SetCommand) Sensitive() bool {
return true
}
// Get command
type GetCommand struct {
@ -83,23 +61,6 @@ func (c *GetCommand) GeneratePath() string{
return "get/" + c.Key
}
func (c *GetCommand) Type() string{
return "GET"
}
func (c *GetCommand) GetValue() string{
return ""
}
func (c *GetCommand) GetKey() string{
return c.Key
}
func (c *GetCommand) Sensitive() bool {
return false
}
// Delete command
type DeleteCommand struct {
Key string `json:"key"`
@ -115,27 +76,6 @@ func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){
return store.Delete(c.Key)
}
func (c *DeleteCommand) GeneratePath() string{
return "delete/" + c.Key
}
func (c *DeleteCommand) Type() string{
return "GET"
}
func (c *DeleteCommand) GetValue() string{
return ""
}
func (c *DeleteCommand) GetKey() string{
return c.Key
}
func (c *DeleteCommand) Sensitive() bool {
return true
}
// Watch command
type WatchCommand struct {
Key string `json:"key"`
@ -158,27 +98,6 @@ func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){
return json.Marshal(res)
}
func (c *WatchCommand) GeneratePath() string{
return "watch/" + c.Key
}
func (c *WatchCommand) Type() string{
return "GET"
}
func (c *WatchCommand) GetValue() string{
return ""
}
func (c *WatchCommand) GetKey() string{
return c.Key
}
func (c *WatchCommand) Sensitive() bool {
return false
}
// JoinCommand
type JoinCommand struct {
Name string `json:"name"`
@ -193,3 +112,6 @@ func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) {
// no result will be returned
return nil, err
}

View File

@ -4,9 +4,9 @@ import (
"github.com/benbjohnson/go-raft"
"net/http"
"encoding/json"
"fmt"
//"fmt"
"io/ioutil"
"bytes"
//"bytes"
"time"
"strings"
"strconv"
@ -75,18 +75,15 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] POST http://%v/join", server.Name())
command := &JoinCommand{}
if err := decodeJsonRequest(req, command); err == nil {
if _, err= server.Do(command); err != nil {
warn("raftd: Unable to join: %v", err)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
debug("Receive Join Request from %s", command.Name)
excute(command, &w)
} else {
warn("[join] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
@ -94,8 +91,6 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/set/"):]
debug("[recv] POST http://%v/set/%s", server.Name(), key)
content, err := ioutil.ReadAll(req.Body)
if err != nil {
@ -104,6 +99,8 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
return
}
debug("[recv] POST http://%v/set/%s [%s]", server.Name(), key, content)
command := &SetCommand{}
command.Key = key
values := strings.Split(string(content), " ")
@ -122,18 +119,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
command.ExpireTime = time.Unix(0,0)
}
Dispatch(server, command, w)
}
func GetHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/get/"):]
debug("[recv] GET http://%v/get/%s", server.Name(), key)
command := &GetCommand{}
command.Key = key
Dispatch(server, command, w)
excute(command, &w)
}
@ -145,10 +131,58 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &DeleteCommand{}
command.Key = key
Dispatch(server, command, w)
excute(command, &w)
}
func excute(c Command, w *http.ResponseWriter) {
if server.State() == "leader" {
if body, err := server.Do(c); err != nil {
warn("raftd: Unable to write file: %v", err)
(*w).WriteHeader(http.StatusInternalServerError)
return
} else {
(*w).WriteHeader(http.StatusOK)
(*w).Write(body)
return
}
} else {
// tell the client where is the leader
(*w).WriteHeader(http.StatusTemporaryRedirect)
(*w).Write([]byte(server.Leader()))
return
}
(*w).WriteHeader(http.StatusInternalServerError)
return
}
func MasterHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(server.Leader()))
}
func GetHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/get/"):]
debug("[recv] GET http://%v/get/%s", server.Name(), key)
command := &GetCommand{}
command.Key = key
if body, err := command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
}
func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/watch/"):]
@ -157,105 +191,19 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &WatchCommand{}
command.Key = key
Dispatch(server, command, w)
}
func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
var body []byte
var err error
debug("Dispatch command")
// i am the leader, i will take care of the command
if server.State() == "leader" {
// if the command will change the state of the state machine
// the command need to append to the log entry
if command.Sensitive() {
if body, err = server.Do(command); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
// good to go
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
// for non-sentitive command, directly apply it
} else {
if body, err = command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
}
// redirect the command to the current leader
if body, err := command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
leaderName := server.Leader()
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
if leaderName =="" {
// no luckey, during the voting process
// the client need to catch the error and try again
w.WriteHeader(http.StatusInternalServerError)
return
}
debug("forward command to %s", leaderName)
path := command.GeneratePath()
if command.Type() == "POST" {
debug("[send] POST http://%v/%s", leaderName, path)
reader := bytes.NewReader([]byte(command.GetValue()))
// t must be ok
t,_ := server.Transporter().(transHandler)
reps, _ := t.client.Post(fmt.Sprintf("http://%v/%s",
leaderName, command.GeneratePath()), "application/json", reader)
if reps == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// forwarding
w.WriteHeader(reps.StatusCode)
body, _ := ioutil.ReadAll(reps.Body)
w.Write(body)
return
} else if command.Type() == "GET" {
debug("[send] GET http://%v/%s", leaderName, path)
reps, _ := http.Get(fmt.Sprintf("http://%v/%s",
leaderName, command.GeneratePath()))
if reps == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// forwarding
body, _ := ioutil.ReadAll(reps.Body)
w.WriteHeader(reps.StatusCode)
w.Write(body)
} else {
//unsupported type
}
}
}

View File

@ -27,6 +27,7 @@ import (
//
//------------------------------------------------------------------------------
var verbose bool
var leaderHost string
var address string
@ -44,14 +45,18 @@ func init() {
flag.StringVar(&certFile, "cert", "", "the cert file of the server")
flag.StringVar(&keyFile, "key", "", "the key file of the server")
}
// CONSTANTS
const (
HTTP = iota
HTTPS
HTTPSANDVERIFY
)
const (
ELECTIONTIMTOUT = 3 * time.Second
HEARTBEATTIMEOUT = 1 * time.Second
)
//------------------------------------------------------------------------------
//
// Typedefs
@ -74,13 +79,6 @@ var logger *log.Logger
var storeMsg chan string
// CONSTANTS
const (
HTTP = iota
HTTPS
HTTPSANDVERIFY
)
//------------------------------------------------------------------------------
//
@ -247,6 +245,7 @@ func startTransport(port int, st int) {
http.HandleFunc("/get/", GetHttpHandler)
http.HandleFunc("/delete/", DeleteHttpHandler)
http.HandleFunc("/watch/", WatchHttpHandler)
http.HandleFunc("/master", MasterHttpHandler)
switch st {
@ -375,26 +374,30 @@ func Join(s *raft.Server, serverName string) error {
json.NewEncoder(&b).Encode(command)
var resp *http.Response
var err error
// t must be ok
t,_ := server.Transporter().(transHandler)
if t.client != nil {
debug("[send] POST https://%v/join", "localhost:4001")
resp, err = t.client.Post(fmt.Sprintf("https://%s/join", serverName), "application/json", &b)
} else {
debug("[send] POST http://%v/join", "localhost:4001")
resp, err = http.Post(fmt.Sprintf("https://%s/join", serverName), "application/json", &b)
}
debug("Send Join Request to %s", serverName)
resp, err := Post(&t, fmt.Sprintf("%s/join", serverName), &b)
if resp != nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
for {
if resp != nil {
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
address, err := ioutil.ReadAll(resp.Body)
if err != nil {
warn("Cannot Read Leader info: %v", err)
}
debug("Leader is %s", address)
debug("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(command)
resp, err = Post(&t, fmt.Sprintf("%s/join", address), &b)
}
}
}
return fmt.Errorf("raftd: Unable to join: %v", err)
return fmt.Errorf("Unable to join: %v", err)
}
//--------------------------------------
// Web Helper
@ -431,6 +434,27 @@ func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
}
}
func Post(t *transHandler, path string, body io.Reader) (*http.Response, error){
if t.client != nil {
resp, err := t.client.Post("https://" + path, "application/json", body)
return resp, err
} else {
resp, err := http.Post("http://" + path, "application/json", body)
return resp, err
}
}
func Get(t *transHandler, path string) (*http.Response, error) {
if t.client != nil {
resp, err := t.client.Get("https://" + path)
return resp, err
} else {
resp, err := http.Get("http://" + path)
return resp, err
}
}
//--------------------------------------
// Log
//--------------------------------------
@ -446,7 +470,7 @@ func info(msg string, v ...interface{}) {
}
func warn(msg string, v ...interface{}) {
logger.Printf("WARN " + msg + "\n", v...)
logger.Printf("Alpaca Server: WARN " + msg + "\n", v...)
}
func fatal(msg string, v ...interface{}) {

View File

@ -7,6 +7,9 @@ import (
"fmt"
)
// global store
var s *Store
// CONSTANTS
const (
ERROR = -1 + iota
@ -15,14 +18,27 @@ const (
GET
)
var PERMANENT = time.Unix(0,0)
type Store struct {
// use the build-in hash map as the key-value store structure
Nodes map[string]Node `json:"nodes"`
// the string channel to send messages to the outside world
// now we use it to send changes to the hub of the web service
messager *chan string
}
type Node struct {
Value string `json:"value"`
// if the node is a permanent one the ExprieTime will be Unix(0,0)
// Otherwise after the expireTime, the node will be deleted
ExpireTime time.Time `json:"expireTime"`
// a channel to update the expireTime of the node
update chan time.Time `json:"-"`
}
@ -31,14 +47,14 @@ type Response struct {
Key string `json:"key"`
OldValue string `json:"oldValue"`
NewValue string `json:"newValue"`
// if the key existed before the action, this field should be true
// if the key did not exist before the action, this field should be false
Exist bool `json:"exist"`
Expiration time.Time `json:"expiration"`
}
// global store
var s *Store
func init() {
s = createStore()
s.messager = nil
@ -51,10 +67,12 @@ func createStore() *Store{
return s
}
// return a pointer to the store
func GetStore() *Store {
return s
}
// set the messager of the store
func (s *Store)SetMessager(messager *chan string) {
s.messager = messager
}
@ -66,44 +84,45 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
var isExpire bool = false
isExpire = !expireTime.Equal(time.Unix(0,0))
isExpire = !expireTime.Equal(PERMANENT)
// when the slow follower receive the set command
// the key may be expired, we need also to delete
// the previous value of key
// the key may be expired, we should not add the node
// also if the node exist, we need to delete the node
if isExpire && expireTime.Sub(time.Now()) < 0 {
return Delete(key)
}
// get the node
node, ok := s.Nodes[key]
if ok {
// if node is not permanent before
// update its expireTime
if !node.ExpireTime.Equal(time.Unix(0,0)) {
if !node.ExpireTime.Equal(PERMANENT) {
node.update <- expireTime
} else {
// if we want the permanent to have expire time
// we need to create a chan and create a func
// if we want the permanent node to have expire time
// we need to create a chan and create a go routine
if isExpire {
node.update = make(chan time.Time)
go expire(key, node.update, expireTime)
}
}
// update the information of the node
node.ExpireTime = expireTime
node.Value = value
notify(SET, key, node.Value, value, true)
msg, err := json.Marshal(Response{SET, key, node.Value, value, true, expireTime})
resp := Response{SET, key, node.Value, value, true, expireTime}
// notify the web interface
msg, err := json.Marshal(resp)
notify(resp)
// send to the messager
if (s.messager != nil && err == nil) {
*s.messager <- string(msg)
@ -111,21 +130,23 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
return msg, err
// add new node
} else {
// add new node
update := make(chan time.Time)
s.Nodes[key] = Node{value, expireTime, update}
// nofity the watcher
notify(SET, key, "", value, false)
if isExpire {
go expire(key, update, expireTime)
}
msg, err := json.Marshal(Response{SET, key, "", value, false, expireTime})
resp := Response{SET, key, "", value, false, expireTime}
msg, err := json.Marshal(resp)
// nofity the watcher
notify(resp)
// notify the web interface
if (s.messager != nil && err == nil) {
@ -137,23 +158,45 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
}
}
// delete the key when it expires
// should be used as a go routine to delete the key when it expires
func expire(key string, update chan time.Time, expireTime time.Time) {
duration := expireTime.Sub(time.Now())
for {
select {
// timeout delte key
// timeout delete the node
case <-time.After(duration):
fmt.Println("expired at ", time.Now())
Delete(key)
return
node, ok := s.Nodes[key]
if !ok {
return
} else {
delete(s.Nodes, key)
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime}
msg, err := json.Marshal(resp)
notify(resp)
// notify the messager
if (s.messager != nil && err == nil) {
*s.messager <- string(msg)
}
return
}
case updateTime := <-update:
//update duration
if updateTime.Equal(time.Unix(0,0)) {
fmt.Println("node became stable")
// if the node become a permanent one, the go routine is
// not needed
if updateTime.Equal(PERMANENT) {
return
}
// update duration
duration = updateTime.Sub(time.Now())
}
}
@ -172,20 +215,33 @@ func Get(key string) Response {
}
}
// delete the key, return the old value if the key exists
// delete the key
func Delete(key string) ([]byte, error) {
key = path.Clean(key)
node, ok := s.Nodes[key]
if ok {
delete(s.Nodes, key)
notify(DELETE, key, node.Value, "", true)
if node.ExpireTime.Equal(PERMANENT) {
msg, err := json.Marshal(Response{DELETE, key, node.Value, "", true, node.ExpireTime})
delete(s.Nodes, key)
// notify the web interface
} else {
// kill the expire go routine
node.update <- PERMANENT
delete(s.Nodes, key)
}
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime}
msg, err := json.Marshal(resp)
notify(resp)
// notify the messager
if (s.messager != nil && err == nil) {
*s.messager <- string(msg)
@ -194,7 +250,6 @@ func Delete(key string) ([]byte, error) {
return msg, err
} else {
// no notify to the watcher and web interface
return json.Marshal(Response{DELETE, key, "", "", false, time.Unix(0, 0)})
}
@ -213,20 +268,25 @@ func (s *Store)Save() ([]byte, error) {
// recovery the state of the stroage system from a previous state
func (s *Store)Recovery(state []byte) error {
err := json.Unmarshal(state, s)
// clean the expired nodes
clean()
return err
}
// clean all expired keys
func clean() {
for key, node := range s.Nodes{
// stable node
if node.ExpireTime.Equal(time.Unix(0,0)) {
if node.ExpireTime.Equal(PERMANENT) {
continue
} else {
if node.ExpireTime.Sub(time.Now()) >= time.Second {
node.update = make(chan time.Time)
go expire(key, node.update, node.ExpireTime)
} else {
// we should delete this node
delete(s.Nodes, key)

View File

@ -4,7 +4,6 @@ import (
"path"
"strings"
//"fmt"
"time"
)
@ -49,9 +48,10 @@ func AddWatcher(prefix string, c chan Response) error {
}
// notify the watcher a action happened
func notify(action int, key string, oldValue string, newValue string, exist bool) error {
key = path.Clean(key)
segments := strings.Split(key, "/")
func notify(resp Response) error {
resp.Key = path.Clean(resp.Key)
segments := strings.Split(resp.Key, "/")
currPath := "/"
// walk through all the pathes
@ -62,11 +62,9 @@ func notify(action int, key string, oldValue string, newValue string, exist bool
if ok {
n := Response {action, key, oldValue, newValue, exist, time.Unix(0, 0)}
// notify all the watchers
for _, c := range chans {
c <- n
c <- resp
}
// we have notified all the watchers at this path

View File

@ -20,17 +20,8 @@ func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
resp, err := Post(&t, fmt.Sprintf("%s/log/append", peer.Name()), &b)
var resp *http.Response
var err error
if t.client != nil {
debug("[send] POST https://%s/log/append [%d]", peer.Name(), len(req.Entries))
resp, err = http.Post(fmt.Sprintf("https://%s/log/append", peer.Name()), "application/json", &b)
} else {
debug("[send] POST http://%s/log/append [%d]", peer.Name(), len(req.Entries))
resp, err = t.client.Post(fmt.Sprintf("http://%s/log/append", peer.Name()), "application/json", &b)
}
if resp != nil {
defer resp.Body.Close()
aersp = &raft.AppendEntriesResponse{}
@ -48,16 +39,7 @@ func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
var resp *http.Response
var err error
if t.client != nil {
debug("[send] POST https://%s/vote", peer.Name())
resp, err = t.client.Post(fmt.Sprintf("https://%s/vote", peer.Name()), "application/json", &b)
} else {
debug("[send] POST http://%s/vote", peer.Name())
resp, err = http.Post(fmt.Sprintf("http://%s/vote", peer.Name()), "application/json", &b)
}
resp, err := Post(&t, fmt.Sprintf("%s/vote", peer.Name()), &b)
if resp != nil {
defer resp.Body.Close()
@ -76,16 +58,10 @@ func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
var resp *http.Response
var err error
debug("[send] POST %s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex)
resp, err := Post(&t, fmt.Sprintf("%s/snapshot", peer.Name()), &b)
if t.client != nil {
debug("[send] POST https://%s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex)
resp, err = t.client.Post(fmt.Sprintf("https://%s/snapshot", peer.Name()), "application/json", &b)
} else {
debug("[send] POST http://%s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex)
resp, err = http.Post(fmt.Sprintf("http://%s/snapshot", peer.Name()), "application/json", &b)
}
if resp != nil {
defer resp.Body.Close()
aersp = &raft.SnapshotResponse{}