clean up the codes

release-0.4
Xiang Li 2013-06-13 11:01:06 -07:00
parent 4b4a7c4976
commit 56f28f80b2
6 changed files with 136 additions and 131 deletions

View File

@ -10,16 +10,15 @@ import (
"encoding/json"
)
// A command represents an action to be taken on the replicated state machine.
type Command interface {
CommandName() string
Apply(server *raft.Server) ([]byte, error)
GeneratePath() string
Type() string
GeneratePath() string // Gererate a path for http request
Type() string // http request type
GetValue() string
GetKey() string
Sensitive() bool
Sensitive() bool // Sensitive to the stateMachine
}
// Set command
@ -39,10 +38,12 @@ func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) {
return json.Marshal(res)
}
// Get the path for http request
func (c *SetCommand) GeneratePath() string {
return "set/" + c.Key
}
// Get the type for http request
func (c *SetCommand) Type() string {
return "POST"
}
@ -96,6 +97,7 @@ func (c *GetCommand) Sensitive() bool {
return false
}
// Delete command
type DeleteCommand struct {
Key string `json:"key"`
@ -146,8 +148,10 @@ func (c *WatchCommand) CommandName() string {
func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){
ch := make(chan Response)
// add to the watchers list
w.add(c.Key, ch)
// wait for the notification for any changing
res := <- ch
return json.Marshal(res)
@ -173,6 +177,7 @@ func (c *WatchCommand) Sensitive() bool {
return false
}
// JoinCommand
type JoinCommand struct {
Name string `json:"name"`
@ -184,5 +189,6 @@ func (c *JoinCommand) CommandName() string {
func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) {
err := server.AddPeer(c.Name)
// no result will be returned
return nil, err
}

View File

@ -9,12 +9,12 @@ import (
"bytes"
)
//--------------------------------------
// HTTP Handlers
//--------------------------------------
// Get all the current logs
func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] GET http://%v/log", server.Name())
w.Header().Set("Content-Type", "application/json")
@ -33,6 +33,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
return
}
}
warn("[vote] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
@ -41,19 +42,16 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
err := decodeJsonRequest(req, aereq)
if err == nil {
debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
debug("My role is %s", server.State())
if resp, _ := server.AppendEntries(aereq); resp != nil {
debug("write back success")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
if !resp.Success {
fmt.Println("append error")
debug("[Append Entry] Step back")
}
return
}
}
warn("[append] ERROR: %v", err)
debug("write back")
w.WriteHeader(http.StatusInternalServerError)
}
@ -96,6 +94,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] POST http://%v/set/%s", server.Name(), key)
content, err := ioutil.ReadAll(req.Body)
if err != nil {
warn("raftd: Unable to read: %v", err)
w.WriteHeader(http.StatusInternalServerError)
@ -107,7 +106,6 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
command.Value = string(content)
Dispatch(server, command, w)
}
func GetHttpHandler(w http.ResponseWriter, req *http.Request) {
@ -131,7 +129,6 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) {
command.Key = key
Dispatch(server, command, w)
}
@ -151,91 +148,94 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
var body []byte
var err error
debug("Dispatch command")
fmt.Println("dispatch")
// unlikely to fail twice
for {
// i am the leader, i will take care of the command
if server.State() == "leader" {
if command.Sensitive() {
if body, err = server.Do(command); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
// good to go
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
} else {
fmt.Println("non-sensitive")
if body, err = command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
// good to go
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
}
// redirect the command to the current leader
} else {
leaderName := server.Leader()
if leaderName =="" {
// no luckey, during the voting process
// i am the leader, i will take care of the command
if server.State() == "leader" {
// if the command will change the state of the state machine
// the command need to append to the log entry
if command.Sensitive() {
if body, err = server.Do(command); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
fmt.Println("forward to ", leaderName)
path := command.GeneratePath()
if command.Type() == "POST" {
debug("[send] POST http://%v/%s", leaderName, path)
reader := bytes.NewReader([]byte(command.GetValue()))
reps, _ := http.Post(fmt.Sprintf("http://%v/%s",
leaderName, command.GeneratePath()), "application/json", reader)
body, _ := ioutil.ReadAll(reps.Body)
fmt.Println(body)
} else {
// good to go
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
// for non-sentitive command, directly apply it
} else {
if body, err = command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
}
// redirect the command to the current leader
} else {
leaderName := server.Leader()
if leaderName =="" {
// no luckey, during the voting process
// the client need to catch the error and try again
w.WriteHeader(http.StatusInternalServerError)
return
}
debug("forward command to %s", leaderName)
path := command.GeneratePath()
if command.Type() == "POST" {
debug("[send] POST http://%v/%s", leaderName, path)
reader := bytes.NewReader([]byte(command.GetValue()))
reps, _ := http.Post(fmt.Sprintf("http://%v/%s",
leaderName, command.GeneratePath()), "application/json", reader)
if reps == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// forwarding
w.WriteHeader(reps.StatusCode)
body, _ := ioutil.ReadAll(reps.Body)
w.Write(body)
return
} else if command.Type() == "GET" {
debug("[send] GET http://%v/%s", leaderName, path)
reps, _ := http.Get(fmt.Sprintf("http://%v/%s",
leaderName, command.GeneratePath()))
// good to go
body, _ := ioutil.ReadAll(reps.Body)
fmt.Println(body)
w.WriteHeader(http.StatusOK)
if reps == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// forwarding
body, _ := ioutil.ReadAll(reps.Body)
w.WriteHeader(reps.StatusCode)
w.Write(body)
} else {
//unsupported type
}
if err != nil {
// should check other errors
continue
} else {
//good to go
return
//unsupported type
}
}
}
}

View File

@ -13,6 +13,7 @@ import (
"strings"
"os"
"time"
"strconv"
)
//------------------------------------------------------------------------------
@ -22,12 +23,22 @@ import (
//------------------------------------------------------------------------------
var verbose bool
var leaderHost string
var address string
func init() {
flag.BoolVar(&verbose, "v", false, "verbose logging")
flag.BoolVar(&verbose, "verbose", false, "verbose logging")
flag.StringVar(&leaderHost, "c", "", "join to a existing cluster")
flag.StringVar(&address, "a", "", "the address of the local machine")
}
const (
ELECTIONTIMTOUT = 3 * time.Second
HEARTBEATTIMEOUT = 1 * time.Second
)
//------------------------------------------------------------------------------
//
// Typedefs
@ -62,9 +73,6 @@ func main() {
var err error
logger = log.New(os.Stdout, "", log.LstdFlags)
flag.Parse()
if verbose {
fmt.Println("Verbose logging enabled.\n")
}
// Setup commands.
raft.RegisterCommand(&JoinCommand{})
@ -85,65 +93,54 @@ func main() {
// Read server info from file or grab it from user.
var info *Info = getInfo(path)
name := fmt.Sprintf("%s:%d", info.Host, info.Port)
fmt.Printf("Name: %s\n\n", name)
t := transHandler{}
// Setup new raft server.
server, err = raft.NewServer(name, path, t, s, nil)
//server.DoHandler = DoHandler;
if err != nil {
fatal("%v", err)
}
server.LoadSnapshot()
server.Initialize()
fmt.Println("1 join as ", server.State(), " term ", server.Term())
// Join to another server if we don't have a log.
server.SetElectionTimeout(ELECTIONTIMTOUT)
server.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
if server.IsLogEmpty() {
var leaderHost string
fmt.Println("2 join as ", server.State(), " term ", server.Term())
fmt.Println("This server has no log. Please enter a server in the cluster to join\nto or hit enter to initialize a cluster.");
fmt.Printf("Join to (host:port)> ");
fmt.Scanf("%s", &leaderHost)
fmt.Println("3 join as ", server.State(), " term ", server.Term())
// start as a leader in a new cluster
if leaderHost == "" {
fmt.Println("init")
//server.SetElectionTimeout(300 * time.Millisecond)
//server.SetHeartbeatTimeout(100 * time.Millisecond)
server.SetElectionTimeout(3 * time.Second)
server.SetHeartbeatTimeout(1 * time.Second)
server.StartHeartbeatTimeout()
server.StartLeader()
// join self
// join self as a peer
command := &JoinCommand{}
command.Name = server.Name()
server.Do(command)
// start as a fellower in a existing cluster
} else {
//server.SetElectionTimeout(300 * time.Millisecond)
//server.SetHeartbeatTimeout(100 * time.Millisecond)
server.SetElectionTimeout(3 * time.Second)
server.SetHeartbeatTimeout(1 * time.Second)
server.StartElectionTimeout()
server.StartFollower()
fmt.Println("4 join as ", server.State(), " term ", server.Term())
Join(server, leaderHost)
fmt.Println("success join")
}
// rejoin the previous cluster
} else {
//server.SetElectionTimeout(300 * time.Millisecond)
//server.SetHeartbeatTimeout(100 * time.Millisecond)
server.SetElectionTimeout(3 * time.Second)
server.SetHeartbeatTimeout(1 * time.Second)
server.StartElectionTimeout()
server.StartFollower()
}
// open the snapshot
go server.Snapshot()
// open snapshot
//go server.Snapshot()
// internal commands
http.HandleFunc("/join", JoinHttpHandler)
@ -158,6 +155,7 @@ func main() {
http.HandleFunc("/delete/", DeleteHttpHandler)
http.HandleFunc("/watch/", WatchHttpHandler)
// listen on http port
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))
}
@ -186,17 +184,24 @@ func getInfo(path string) *Info {
// Otherwise ask user for info and write it to file.
} else {
fmt.Printf("Enter hostname: [localhost] ");
fmt.Scanf("%s", &info.Host)
info.Host = strings.TrimSpace(info.Host)
if info.Host == "" {
info.Host = "localhost"
if address == "" {
fatal("Please give the address of the local machine")
}
fmt.Printf("Enter port: [4001] ");
fmt.Scanf("%d", &info.Port)
if info.Port == 0 {
info.Port = 4001
input := strings.Split(address, ":")
if len(input) != 2 {
fatal("Wrong address %s", address)
}
info.Host = input[0]
info.Host = strings.TrimSpace(info.Host)
info.Port, err = strconv.Atoi(input[1])
if err != nil {
fatal("Wrong port %s", address)
}
// Write to file.
@ -218,6 +223,7 @@ func getInfo(path string) *Info {
// Send join requests to the leader.
func Join(s *raft.Server, serverName string) error {
var b bytes.Buffer
command := &JoinCommand{}
command.Name = s.Name()

View File

@ -3,7 +3,6 @@ package main
import (
"path"
"encoding/json"
//"fmt"
)
// CONSTANTS

View File

@ -64,6 +64,5 @@ func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
return aersp, nil
}
}
fmt.Println("error send snapshot")
return aersp, fmt.Errorf("raftd: Unable to send snapshot: %v", err)
}

View File

@ -3,7 +3,6 @@ package main
import (
"path"
"strings"
"fmt"
)
@ -30,7 +29,7 @@ func createWatcher() *Watcher {
func (w *Watcher) add(prefix string, c chan Response) error {
prefix = "/" + path.Clean(prefix)
fmt.Println("Add ", prefix)
debug("Add a watche at ", prefix)
_, ok := w.chanMap[prefix]
if !ok {
@ -40,15 +39,12 @@ func (w *Watcher) add(prefix string, c chan Response) error {
w.chanMap[prefix] = append(w.chanMap[prefix], c)
}
fmt.Println(len(w.chanMap[prefix]), "@", prefix)
return nil
}
// notify the watcher a action happened
func (w *Watcher) notify(action int, key string, oldValue string, newValue string, exist bool) error {
key = path.Clean(key)
fmt.Println("notify")
segments := strings.Split(key, "/")
currPath := "/"
@ -58,19 +54,18 @@ func (w *Watcher) notify(action int, key string, oldValue string, newValue strin
currPath := path.Join(currPath, segment)
fmt.Println(currPath)
chans, ok := w.chanMap[currPath]
if ok {
fmt.Println("found ", currPath)
debug("Notify at ", currPath)
n := Response {action, key, oldValue, newValue, exist}
// notify all the watchers
for _, c := range chans {
c <- n
}
// we have notified all the watchers at this path
// delete the map
delete(w.chanMap, currPath)