Merge branch 'separate-id' of https://github.com/philips/etcd into separate_id
commit
eadced5dc8
130
README.md
130
README.md
|
@ -9,9 +9,9 @@ A highly-available key value store for shared configuration and service discover
|
|||
* Fast: benchmarked 1000s of writes/s per instance
|
||||
* Reliable: Properly distributed using Raft
|
||||
|
||||
Etcd is written in go and uses the [raft][raft] consensus algorithm to manage a highly availably replicated log.
|
||||
Etcd is written in Go and uses the [raft][raft] consensus algorithm to manage a highly availably replicated log.
|
||||
|
||||
See [go-etcd][go-etcd] for a native go client. Or feel free to just use curl, as in the examples below.
|
||||
See [go-etcd][go-etcd] for a native Go client. Or feel free to just use curl, as in the examples below.
|
||||
|
||||
[raft]: https://github.com/coreos/go-raft
|
||||
[go-etcd]: https://github.com/coreos/go-etcd
|
||||
|
@ -31,12 +31,14 @@ To build etcd run the build script. This will generate a binary in the base dire
|
|||
These examples will use a single node cluster to show you the basics of the etcd REST API. Lets start etcd:
|
||||
|
||||
```sh
|
||||
./etcd
|
||||
./etcd -d node0
|
||||
```
|
||||
|
||||
This will bring up a node, which will be listening on internal port 7001 (for server communication) and external port 4001 (for client communication)
|
||||
This will bring up an etcd node listening on port 4001 for client communication and on port 7001 for server-to-server communication. The `-d node0` argument tells etcd to write node configuration, logs and snapshots to the `./node0/` directory.
|
||||
|
||||
#### Setting the value to a key
|
||||
## Usage
|
||||
|
||||
### Setting the value to a key
|
||||
|
||||
Let’s set the first key-value pair to the node. In this case the key is `/message` and the value is `Hello world`.
|
||||
|
||||
|
@ -51,7 +53,7 @@ curl -L http://127.0.0.1:4001/v1/keys/message -d value="Hello world"
|
|||
This response contains five fields. We will introduce three more fields as we try more commands.
|
||||
|
||||
1. The action of the request; we set the value via a POST request, thus the action is `SET`.
|
||||
|
||||
|
||||
2. The key of the request; we set `/message` to `Hello world!`, so the key field is `/message`.
|
||||
Notice we use a file system like structure to represent the key-value pairs. So each key starts with `/`.
|
||||
|
||||
|
@ -59,9 +61,9 @@ Notice we use a file system like structure to represent the key-value pairs. So
|
|||
|
||||
4. If we set a new key; `/message` did not exist before, so this is a new key.
|
||||
|
||||
5. Index field is the unique request index of the set request. Each sensitive request we send to the server will have a unique request index. The current sensitive request are `SET`, `DELETE` and `TESTANDSET`. All of these request will change the state of the key-value store system, thus they are sensitive. `GET`, `LIST` and `WATCH` are non-sensitive commands. Those commands will not change the state of the key-value store system. You may notice that in this example the index is 3, although it is the first request you sent to the server. This is because there are some internal commands that also change the state of the server, we also need to assign them command indexes(Command used to add a server and sync the servers).
|
||||
5. Index is the unique internal log index of the set request. Requests that change the log index include `SET`, `DELETE` and `TESTANDSET`. The `GET`, `LIST` and `WATCH` commands do not change state in the store and so they do not change the index. You may notice that in this example the index is 3, although it is the first request you sent to the server. This is because there are internal commands that also change the state like adding and syncing servers.
|
||||
|
||||
#### Getting the value of a key
|
||||
### Get the value of a key
|
||||
|
||||
Get the value that we just set in `/message` by issuing a GET:
|
||||
|
||||
|
@ -72,7 +74,7 @@ curl -L http://127.0.0.1:4001/v1/keys/message
|
|||
```json
|
||||
{"action":"GET","key":"/message","value":"Hello world","index":3}
|
||||
```
|
||||
#### Changing the value of a key
|
||||
### Change the value of a key
|
||||
|
||||
Change the value of `/message` from `Hello world` to `Hello etcd` with another POST to the key:
|
||||
|
||||
|
@ -86,7 +88,7 @@ curl -L http://127.0.0.1:4001/v1/keys/message -d value="Hello etcd"
|
|||
|
||||
Notice that the `prevValue` is set to `Hello world`.
|
||||
|
||||
#### Deleting a key
|
||||
### Delete a key
|
||||
|
||||
Remove the `/message` key with a DELETE:
|
||||
|
||||
|
@ -98,7 +100,7 @@ curl -L http://127.0.0.1:4001/v1/keys/message -X DELETE
|
|||
{"action":"DELETE","key":"/message","prevValue":"Hello etcd","index":5}
|
||||
```
|
||||
|
||||
#### Using a TTL on a key
|
||||
### Using key TTL
|
||||
|
||||
Keys in etcd can be set to expire after a specified number of seconds. That is done by setting a TTL (time to live) on the key when you POST:
|
||||
|
||||
|
@ -124,12 +126,11 @@ curl -L http://127.0.0.1:4001/v1/keys/foo
|
|||
|
||||
If the TTL has expired, the key will be deleted, and you will be returned a 404.
|
||||
|
||||
```html
|
||||
404 page not found
|
||||
```json
|
||||
{"errorCode":100,"message":"Key Not Found","cause":"/foo"}
|
||||
```
|
||||
|
||||
|
||||
#### Watching a prefix
|
||||
### Watching a prefix
|
||||
|
||||
We can watch a path prefix and get notifications if any key change under that prefix.
|
||||
|
||||
|
@ -163,13 +164,9 @@ curl -L http://127.0.0.1:4001/v1/watch/foo -d index=7
|
|||
|
||||
The watch command returns immediately with the same response as previous.
|
||||
|
||||
#### Atomic Test and Set
|
||||
### Atomic Test and Set
|
||||
|
||||
Etcd servers will process all the command in sequence atomically. Thus it can be used as a centralized coordination service in a cluster.
|
||||
|
||||
`TestAndSet` is the most basic operation to build distributed lock service.
|
||||
|
||||
The basic logic is to test whether the given previous value is equal to the value of the key, if equal etcd will change the value of the key to the given value.
|
||||
Etcd can be used as a centralized coordination service in a cluster and `TestAndSet` is the most basic operation to build distributed lock service. This command will set the value only if the client provided `prevValue` is equal the current key value.
|
||||
|
||||
Here is a simple example. Let's create a key-value pair first: `testAndSet=one`.
|
||||
|
||||
|
@ -206,8 +203,7 @@ The response should be
|
|||
|
||||
We successfully changed the value from “one” to “two”, since we give the correct previous value.
|
||||
|
||||
|
||||
#### Listing directory
|
||||
### Listing a directory
|
||||
|
||||
Last we provide a simple List command to list all the keys under a prefix path.
|
||||
|
||||
|
@ -221,7 +217,7 @@ We create another one `/foo/foo_dir/foo=barbarbar`
|
|||
curl -L http://127.0.0.1:4001/v1/keys/foo/foo_dir/bar -d value=barbarbar
|
||||
```
|
||||
|
||||
Let us list them next.
|
||||
Now list the keys under `/foo`
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/v1/keys/foo/
|
||||
|
@ -235,16 +231,20 @@ We should see the response as an array of items
|
|||
|
||||
which meas `foo=barbar` is a key-value pair under `/foo` and `foo_dir` is a directory.
|
||||
|
||||
#### Using HTTPS between server and client
|
||||
## Advanced Usage
|
||||
|
||||
### Transport security with HTTPS
|
||||
|
||||
Etcd supports SSL/TLS and client cert authentication for clients to server, as well as server to server communication
|
||||
|
||||
Before that we need to have a CA cert`clientCA.crt` and signed key pair `client.crt`, `client.key` .
|
||||
First, you need to have a CA cert `clientCA.crt` and signed key pair `client.crt`, `client.key`. This site has a good reference for how to generate self-signed key pairs:
|
||||
|
||||
This site has a good reference for how to generate self-signed key pairs
|
||||
```url
|
||||
http://www.g-loaded.eu/2005/11/10/be-your-own-ca/
|
||||
```
|
||||
|
||||
Next, lets configure etcd to use this keypair:
|
||||
|
||||
```sh
|
||||
./etcd -clientCert client.crt -clientKey client.key -f
|
||||
```
|
||||
|
@ -252,28 +252,29 @@ http://www.g-loaded.eu/2005/11/10/be-your-own-ca/
|
|||
`-f` forces new node configuration if existing configuration is found (WARNING: data loss!)
|
||||
`-clientCert` and `-clientKey` are the key and cert for transport layer security between client and server
|
||||
|
||||
```sh
|
||||
curl -L https://127.0.0.1:4001/v1/keys/foo -d value=bar -v -k
|
||||
```
|
||||
|
||||
or
|
||||
You can now test the configuration using https:
|
||||
|
||||
```sh
|
||||
curl -L https://127.0.0.1:4001/v1/keys/foo -d value=bar -v -cacert clientCA.crt
|
||||
```
|
||||
|
||||
You should be able to see the handshake succeed.
|
||||
|
||||
```
|
||||
...
|
||||
SSLv3, TLS handshake, Finished (20):
|
||||
...
|
||||
```
|
||||
|
||||
And also the response from the etcd server.
|
||||
|
||||
```json
|
||||
{"action":"SET","key":"/foo","value":"bar","newKey":true,"index":3}
|
||||
```
|
||||
|
||||
We also can do authentication using CA cert. The clients will also need to provide their cert to the server. The server will check whether the cert is signed by the CA and decide whether to serve the request.
|
||||
### Authentication with HTTPS client certificates
|
||||
|
||||
We can also do authentication using CA certs. The clients will provide their cert to the server and the server will check whether the cert is signed by the CA and decide whether to serve the request.
|
||||
|
||||
```sh
|
||||
./etcd -clientCert client.crt -clientKey client.key -clientCAFile clientCA.crt -f
|
||||
|
@ -281,29 +282,21 @@ We also can do authentication using CA cert. The clients will also need to provi
|
|||
|
||||
```-clientCAFile``` is the path to the CA cert.
|
||||
|
||||
Try the same request to this server.
|
||||
```sh
|
||||
curl -L https://127.0.0.1:4001/v1/keys/foo -d value=bar -v -k
|
||||
```
|
||||
or
|
||||
Try the same request to this server:
|
||||
|
||||
```sh
|
||||
curl -L https://127.0.0.1:4001/v1/keys/foo -d value=bar -v -cacert clientCA.crt
|
||||
```
|
||||
|
||||
The request should be rejected by the server.
|
||||
|
||||
```
|
||||
...
|
||||
routines:SSL3_READ_BYTES:sslv3 alert bad certificate
|
||||
...
|
||||
```
|
||||
|
||||
We need to give the CA signed cert to the server.
|
||||
```sh
|
||||
curl -L https://127.0.0.1:4001/v1/keys/foo -d value=bar -v --key myclient.key --cert myclient.crt -k
|
||||
```
|
||||
|
||||
or
|
||||
We need to give the CA signed cert to the server.
|
||||
|
||||
```sh
|
||||
curl -L https://127.0.0.1:4001/v1/keys/foo -d value=bar -v --key myclient.key --cert myclient.crt -cacert clientCA.crt
|
||||
|
@ -317,14 +310,17 @@ SSLv3, TLS handshake, CERT verify (15):
|
|||
TLS handshake, Finished (20)
|
||||
```
|
||||
|
||||
And also the response from the server
|
||||
And also the response from the server:
|
||||
|
||||
```json
|
||||
{"action":"SET","key":"/foo","value":"bar","newKey":true,"index":3}
|
||||
```
|
||||
|
||||
### Setting up a cluster of three machines
|
||||
## Clustering
|
||||
|
||||
Next let's explore the use of etcd clustering. We use go-raft as the underlying distributed protocol which provides consistency and persistence of the data across all of the etcd instances.
|
||||
### Example cluster of three machines
|
||||
|
||||
Let's explore the use of etcd clustering. We use go-raft as the underlying distributed protocol which provides consistency and persistence of the data across all of the etcd instances.
|
||||
|
||||
Let start by creating 3 new etcd instances.
|
||||
|
||||
|
@ -341,7 +337,7 @@ Let the join two more nodes to this cluster using the -C argument:
|
|||
./etcd -c 4003 -s 7003 -C 127.0.0.1:7001 -d nodes/node3
|
||||
```
|
||||
|
||||
Get the machines in the cluster
|
||||
Get the machines in the cluster:
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/machines
|
||||
|
@ -353,9 +349,9 @@ We should see there are three nodes in the cluster
|
|||
0.0.0.0:4001,0.0.0.0:4002,0.0.0.0:4003
|
||||
```
|
||||
|
||||
Machine list is also available via this API
|
||||
The machine list is also available via this API:
|
||||
|
||||
```sh
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/v1/keys/_etcd/machines
|
||||
```
|
||||
|
||||
|
@ -382,16 +378,11 @@ Now we can do normal SET and GET operations on keys as we explored earlier.
|
|||
curl -L http://127.0.0.1:4001/v1/keys/foo -d value=bar
|
||||
```
|
||||
|
||||
When the client sends a sensitive command (`set`, `delete`, `testAndset` ) to the server, the command needs to be redirect to the leader of the cluster.
|
||||
|
||||
So we add the ` -L ` flag to make curl follow location hints in http location header when there is a redirection http response.
|
||||
|
||||
The response should be
|
||||
```json
|
||||
{"action":"SET","key":"/foo","value":"bar","newKey":true,"index":5}
|
||||
```
|
||||
|
||||
#### Killing Nodes in the Cluster
|
||||
### Killing Nodes in the Cluster
|
||||
|
||||
Let's kill the leader of the cluster and get the value from the other machine:
|
||||
|
||||
|
@ -415,9 +406,9 @@ You should be able to see this:
|
|||
{"action":"GET","key":"/foo","value":"bar","index":5}
|
||||
```
|
||||
|
||||
It succeed!
|
||||
It succeeded!
|
||||
|
||||
#### Testing Persistence
|
||||
### Testing Persistence
|
||||
|
||||
OK. Next let us kill all the nodes to test persistence. And restart all the nodes use the same command as before.
|
||||
|
||||
|
@ -431,7 +422,28 @@ curl -L http://127.0.0.1:4002/v1/keys/foo
|
|||
{"action":"GET","key":"/foo","value":"bar","index":5}
|
||||
```
|
||||
|
||||
#### Using HTTPS between servers
|
||||
### Using HTTPS between servers
|
||||
|
||||
In the previous example we showed how to use SSL client certs for client to server communication. Etcd can also do internal server to server communication using SSL client certs. To do this just change the ```-client*``` flags to ```-server*```.
|
||||
|
||||
If you are using SSL for server to server communication, you must use it on all instances of etcd.
|
||||
|
||||
## Libraries and Tools
|
||||
|
||||
**Tools**
|
||||
|
||||
- [etcdctl](https://github.com/coreos/etcdctl) - A command line client for etcd
|
||||
|
||||
**Go libraries**
|
||||
|
||||
- [go-etcd](https://github.com/coreos/go-etcd)
|
||||
|
||||
**Ruby libraries**
|
||||
|
||||
- [iconara/etcd-rb](https://github.com/iconara/etcd-rb)
|
||||
- [jpfuentes2/etcd-ruby](https://github.com/jpfuentes2/etcd-ruby)
|
||||
- [ranjib/etcd-ruby](https://github.com/ranjib/etcd-ruby)
|
||||
|
||||
**Chef Cookbook**
|
||||
|
||||
- [spheromak/etcd-cookbook](https://github.com/spheromak/etcd-cookbook)
|
||||
|
|
|
@ -45,7 +45,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
debugf("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
|
||||
debugf("[recv] POST %v/v1/keys/%s", raftServer.Name(), key)
|
||||
|
||||
value := req.FormValue("value")
|
||||
|
||||
|
@ -96,7 +96,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
|||
func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
||||
key := req.URL.Path[len("/v1/keys/"):]
|
||||
|
||||
debugf("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
|
||||
debugf("[recv] DELETE %v/v1/keys/%s", raftServer.Name(), key)
|
||||
|
||||
command := &DeleteCommand{
|
||||
Key: key,
|
||||
|
@ -172,9 +172,9 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
|
|||
|
||||
if client {
|
||||
clientAddr, _ := getClientAddr(raftServer.Leader())
|
||||
url = scheme + clientAddr + path
|
||||
url = clientAddr + path
|
||||
} else {
|
||||
url = scheme + raftServer.Leader() + path
|
||||
url = raftServer.Leader() + path
|
||||
}
|
||||
|
||||
debugf("Redirect to %s", url)
|
||||
|
|
|
@ -111,9 +111,8 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
|
|||
// JoinCommand
|
||||
type JoinCommand struct {
|
||||
Name string `json:"name"`
|
||||
Hostname string `json:"hostName"`
|
||||
RaftPort int `json:"raftPort"`
|
||||
ClientPort int `json:"clientPort"`
|
||||
RaftURL string `json:"raftURL"`
|
||||
ClientURL string `json:"clientURL"`
|
||||
}
|
||||
|
||||
// The name of the join command in the log
|
||||
|
@ -137,12 +136,14 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
|
|||
return []byte("join fail"), fmt.Errorf(errors[103])
|
||||
}
|
||||
|
||||
raftTransporter.AddPeer(c)
|
||||
|
||||
// add peer in raft
|
||||
err := raftServer.AddPeer(c.Name)
|
||||
|
||||
// add machine in etcd storage
|
||||
key := path.Join("_etcd/machines", c.Name)
|
||||
value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort)
|
||||
value := fmt.Sprintf("server=%s&client=%s", c.RaftURL, c.ClientURL)
|
||||
etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
|
||||
|
||||
return []byte("join success"), err
|
||||
|
|
236
etcd.go
236
etcd.go
|
@ -56,13 +56,14 @@ func init() {
|
|||
flag.BoolVar(&verbose, "v", false, "verbose logging")
|
||||
flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
|
||||
|
||||
|
||||
flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
|
||||
flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
|
||||
|
||||
flag.StringVar(&argInfo.Hostname, "h", "0.0.0.0", "the hostname of the local machine")
|
||||
flag.IntVar(&argInfo.ClientPort, "c", 4001, "the port to communicate with clients")
|
||||
flag.IntVar(&argInfo.RaftPort, "s", 7001, "the port to communicate with servers")
|
||||
flag.IntVar(&argInfo.WebPort, "w", -1, "the port of web interface (-1 means do not start web interface)")
|
||||
flag.StringVar(&argInfo.Name, "n", "", "the node name (required)")
|
||||
flag.StringVar(&argInfo.ClientURL, "c", "127.0.0.1:4001", "the port to communicate with clients")
|
||||
flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the port to communicate with servers")
|
||||
flag.StringVar(&argInfo.WebURL, "w", "", "the port of web interface")
|
||||
|
||||
flag.StringVar(&argInfo.ServerCAFile, "serverCAFile", "", "the path of the CAFile")
|
||||
flag.StringVar(&argInfo.ServerCertFile, "serverCert", "", "the cert file of the server")
|
||||
|
@ -89,14 +90,8 @@ func init() {
|
|||
|
||||
// CONSTANTS
|
||||
const (
|
||||
HTTP = iota
|
||||
HTTPS
|
||||
HTTPSANDVERIFY
|
||||
)
|
||||
|
||||
const (
|
||||
SERVER = iota
|
||||
CLIENT
|
||||
RaftServer = iota
|
||||
EtcdServer
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -117,10 +112,11 @@ const (
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type Info struct {
|
||||
Hostname string `json:"hostname"`
|
||||
RaftPort int `json:"raftPort"`
|
||||
ClientPort int `json:"clientPort"`
|
||||
WebPort int `json:"webPort"`
|
||||
Name string `json:"name"`
|
||||
|
||||
RaftURL string `json:"raftURL"`
|
||||
ClientURL string `json:"clientURL"`
|
||||
WebURL string `json:"webURL"`
|
||||
|
||||
ServerCertFile string `json:"serverCertFile"`
|
||||
ServerKeyFile string `json:"serverKeyFile"`
|
||||
|
@ -148,6 +144,21 @@ var info *Info
|
|||
//
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// Check a URL and clean it up if the user forgot the schema
|
||||
func checkURL(u string, defaultSchema string) string {
|
||||
p, err := url.Parse(u)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(p.Host) == 0 && len(defaultSchema) != 0 {
|
||||
return checkURL(fmt.Sprintf("%s://%s", defaultSchema, u), "")
|
||||
}
|
||||
|
||||
return p.String()
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
// Main
|
||||
//--------------------------------------
|
||||
|
@ -190,6 +201,16 @@ func main() {
|
|||
cluster = strings.Split(string(b), ",")
|
||||
}
|
||||
|
||||
// Otherwise ask user for info and write it to file.
|
||||
argInfo.Name = strings.TrimSpace(argInfo.Name)
|
||||
|
||||
if argInfo.Name == "" {
|
||||
fatal("Please give the name of the server")
|
||||
}
|
||||
|
||||
argInfo.RaftURL = checkURL(argInfo.RaftURL, "http")
|
||||
argInfo.ClientURL = checkURL(argInfo.ClientURL, "http")
|
||||
|
||||
// Setup commands.
|
||||
registerCommands()
|
||||
|
||||
|
@ -200,39 +221,40 @@ func main() {
|
|||
|
||||
info = getInfo(dirPath)
|
||||
|
||||
// security type
|
||||
st := securityType(SERVER)
|
||||
raftTlsConfs, ok := tlsConf(RaftServer)
|
||||
if !ok {
|
||||
fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
|
||||
}
|
||||
|
||||
clientSt := securityType(CLIENT)
|
||||
|
||||
if st == -1 || clientSt == -1 {
|
||||
etcdTlsConfs, ok := tlsConf(EtcdServer)
|
||||
if !ok {
|
||||
fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
|
||||
}
|
||||
|
||||
// Create etcd key-value store
|
||||
etcdStore = store.CreateStore(maxSize)
|
||||
|
||||
startRaft(st)
|
||||
startRaft(raftTlsConfs)
|
||||
|
||||
if argInfo.WebPort != -1 {
|
||||
if argInfo.WebURL != "" {
|
||||
// start web
|
||||
etcdStore.SetMessager(storeMsg)
|
||||
go webHelper()
|
||||
go web.Start(raftServer, argInfo.WebPort)
|
||||
go web.Start(raftServer, argInfo.WebURL)
|
||||
}
|
||||
|
||||
startClientTransport(*info, clientSt)
|
||||
startEtcdTransport(*info, etcdTlsConfs[0])
|
||||
|
||||
}
|
||||
|
||||
// Start the raft server
|
||||
func startRaft(securityType int) {
|
||||
func startRaft(tlsConfs []*tls.Config) {
|
||||
var err error
|
||||
|
||||
raftName := fmt.Sprintf("%s:%d", info.Hostname, info.RaftPort)
|
||||
raftName := info.Name
|
||||
|
||||
// Create transporter for raft
|
||||
raftTransporter = createTransporter(securityType)
|
||||
raftTransporter = newTransporter(tlsConfs[1])
|
||||
|
||||
// Create raft server
|
||||
raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
|
||||
|
@ -267,10 +289,9 @@ func startRaft(securityType int) {
|
|||
// leader need to join self as a peer
|
||||
for {
|
||||
command := &JoinCommand{
|
||||
Name: raftServer.Name(),
|
||||
Hostname: argInfo.Hostname,
|
||||
RaftPort: argInfo.RaftPort,
|
||||
ClientPort: argInfo.ClientPort,
|
||||
Name: raftServer.Name(),
|
||||
RaftURL: argInfo.RaftURL,
|
||||
ClientURL: argInfo.ClientURL,
|
||||
}
|
||||
_, err := raftServer.Do(command)
|
||||
if err == nil {
|
||||
|
@ -328,44 +349,32 @@ func startRaft(securityType int) {
|
|||
}
|
||||
|
||||
// start to response to raft requests
|
||||
go startRaftTransport(*info, securityType)
|
||||
go startRaftTransport(*info, tlsConfs[0])
|
||||
|
||||
}
|
||||
|
||||
// Create transporter using by raft server
|
||||
// Create http or https transporter based on
|
||||
// whether the user give the server cert and key
|
||||
func createTransporter(st int) transporter {
|
||||
func newTransporter(tlsConf *tls.Config) transporter {
|
||||
t := transporter{}
|
||||
|
||||
switch st {
|
||||
case HTTP:
|
||||
t.names = make(map[string]*JoinCommand)
|
||||
|
||||
if tlsConf == nil {
|
||||
t.scheme = "http://"
|
||||
|
||||
tr := &http.Transport{
|
||||
Dial: dialTimeout,
|
||||
}
|
||||
|
||||
t.client = &http.Client{
|
||||
Transport: tr,
|
||||
Transport: &http.Transport{
|
||||
Dial: dialTimeout,
|
||||
},
|
||||
}
|
||||
|
||||
case HTTPS:
|
||||
fallthrough
|
||||
case HTTPSANDVERIFY:
|
||||
} else {
|
||||
t.scheme = "https://"
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(argInfo.ServerCertFile, argInfo.ServerKeyFile)
|
||||
|
||||
if err != nil {
|
||||
fatal(err)
|
||||
}
|
||||
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{tlsCert},
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
TLSClientConfig: tlsConf,
|
||||
Dial: dialTimeout,
|
||||
DisableCompression: true,
|
||||
}
|
||||
|
@ -382,9 +391,10 @@ func dialTimeout(network, addr string) (net.Conn, error) {
|
|||
}
|
||||
|
||||
// Start to listen and response raft command
|
||||
func startRaftTransport(info Info, st int) {
|
||||
func startRaftTransport(info Info, tlsConf *tls.Config) {
|
||||
|
||||
// internal commands
|
||||
http.HandleFunc("/name", NameHttpHandler)
|
||||
http.HandleFunc("/join", JoinHttpHandler)
|
||||
http.HandleFunc("/vote", VoteHttpHandler)
|
||||
http.HandleFunc("/log", GetLogHttpHandler)
|
||||
|
@ -393,33 +403,23 @@ func startRaftTransport(info Info, st int) {
|
|||
http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
|
||||
http.HandleFunc("/client", ClientHttpHandler)
|
||||
|
||||
switch st {
|
||||
|
||||
case HTTP:
|
||||
fmt.Printf("raft server [%s] listen on http port %v\n", info.Hostname, info.RaftPort)
|
||||
fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.RaftPort), nil))
|
||||
|
||||
case HTTPS:
|
||||
fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort)
|
||||
fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", info.RaftPort), info.ServerCertFile, argInfo.ServerKeyFile, nil))
|
||||
|
||||
case HTTPSANDVERIFY:
|
||||
u, _ := url.Parse(info.RaftURL)
|
||||
fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
|
||||
|
||||
if tlsConf == nil {
|
||||
http.ListenAndServe(u.Host, nil)
|
||||
} else {
|
||||
server := &http.Server{
|
||||
TLSConfig: &tls.Config{
|
||||
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||
ClientCAs: createCertPool(info.ServerCAFile),
|
||||
},
|
||||
Addr: fmt.Sprintf(":%d", info.RaftPort),
|
||||
TLSConfig: tlsConf,
|
||||
Addr: u.Host,
|
||||
}
|
||||
fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort)
|
||||
fatal(server.ListenAndServeTLS(info.ServerCertFile, argInfo.ServerKeyFile))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Start to listen and response client command
|
||||
func startClientTransport(info Info, st int) {
|
||||
func startEtcdTransport(info Info, tlsConf *tls.Config) {
|
||||
// external commands
|
||||
http.HandleFunc("/"+version+"/keys/", Multiplexer)
|
||||
http.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
|
||||
|
@ -429,26 +429,16 @@ func startClientTransport(info Info, st int) {
|
|||
http.HandleFunc("/stats", StatsHttpHandler)
|
||||
http.HandleFunc("/test/", TestHttpHandler)
|
||||
|
||||
switch st {
|
||||
|
||||
case HTTP:
|
||||
fmt.Printf("etcd [%s] listen on http port %v\n", info.Hostname, info.ClientPort)
|
||||
fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.ClientPort), nil))
|
||||
|
||||
case HTTPS:
|
||||
fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort)
|
||||
http.ListenAndServeTLS(fmt.Sprintf(":%d", info.ClientPort), info.ClientCertFile, info.ClientKeyFile, nil)
|
||||
|
||||
case HTTPSANDVERIFY:
|
||||
u, _ := url.Parse(info.ClientURL)
|
||||
fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
|
||||
|
||||
if tlsConf == nil {
|
||||
fatal(http.ListenAndServe(u.Host, nil))
|
||||
} else {
|
||||
server := &http.Server{
|
||||
TLSConfig: &tls.Config{
|
||||
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||
ClientCAs: createCertPool(info.ClientCAFile),
|
||||
},
|
||||
Addr: fmt.Sprintf(":%d", info.ClientPort),
|
||||
TLSConfig: tlsConf,
|
||||
Addr: u.Host,
|
||||
}
|
||||
fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort)
|
||||
fatal(server.ListenAndServeTLS(info.ClientCertFile, info.ClientKeyFile))
|
||||
}
|
||||
}
|
||||
|
@ -456,20 +446,28 @@ func startClientTransport(info Info, st int) {
|
|||
//--------------------------------------
|
||||
// Config
|
||||
//--------------------------------------
|
||||
|
||||
// Get the security type
|
||||
func securityType(source int) int {
|
||||
|
||||
func tlsConf(source int) ([]*tls.Config, bool) {
|
||||
var keyFile, certFile, CAFile string
|
||||
var tlsCert tls.Certificate
|
||||
var isAuth bool
|
||||
var err error
|
||||
|
||||
switch source {
|
||||
|
||||
case SERVER:
|
||||
case RaftServer:
|
||||
keyFile = info.ServerKeyFile
|
||||
certFile = info.ServerCertFile
|
||||
CAFile = info.ServerCAFile
|
||||
|
||||
case CLIENT:
|
||||
if keyFile != "" && certFile != "" {
|
||||
tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
|
||||
if err == nil {
|
||||
fatal(err)
|
||||
}
|
||||
isAuth = true
|
||||
}
|
||||
|
||||
case EtcdServer:
|
||||
keyFile = info.ClientKeyFile
|
||||
certFile = info.ClientCertFile
|
||||
CAFile = info.ClientCAFile
|
||||
|
@ -478,25 +476,28 @@ func securityType(source int) int {
|
|||
// If the user do not specify key file, cert file and
|
||||
// CA file, the type will be HTTP
|
||||
if keyFile == "" && certFile == "" && CAFile == "" {
|
||||
|
||||
return HTTP
|
||||
|
||||
return []*tls.Config{nil, nil}, true
|
||||
}
|
||||
|
||||
if keyFile != "" && certFile != "" {
|
||||
if CAFile != "" {
|
||||
// If the user specify all the three file, the type
|
||||
// will be HTTPS with client cert auth
|
||||
return HTTPSANDVERIFY
|
||||
serverConf := &tls.Config{}
|
||||
serverConf.ClientAuth, serverConf.ClientCAs = newCertPool(CAFile)
|
||||
|
||||
if isAuth {
|
||||
raftTransConf := &tls.Config{
|
||||
Certificates: []tls.Certificate{tlsCert},
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
return []*tls.Config{serverConf, raftTransConf}, true
|
||||
}
|
||||
// If the user specify key file and cert file but not
|
||||
// CA file, the type will be HTTPS without client cert
|
||||
// auth
|
||||
return HTTPS
|
||||
|
||||
return []*tls.Config{serverConf, nil}, true
|
||||
|
||||
}
|
||||
|
||||
// bad specification
|
||||
return -1
|
||||
return nil, false
|
||||
|
||||
}
|
||||
|
||||
func parseInfo(path string) *Info {
|
||||
|
@ -547,13 +548,6 @@ func getInfo(path string) *Info {
|
|||
return info
|
||||
}
|
||||
|
||||
// Otherwise ask user for info and write it to file.
|
||||
argInfo.Hostname = strings.TrimSpace(argInfo.Hostname)
|
||||
|
||||
if argInfo.Hostname == "" {
|
||||
fatal("Please give the address of the local machine")
|
||||
}
|
||||
|
||||
info = &argInfo
|
||||
|
||||
// Write to file.
|
||||
|
@ -569,7 +563,10 @@ func getInfo(path string) *Info {
|
|||
}
|
||||
|
||||
// Create client auth certpool
|
||||
func createCertPool(CAFile string) *x509.CertPool {
|
||||
func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
|
||||
if CAFile == "" {
|
||||
return tls.NoClientCert, nil
|
||||
}
|
||||
pemByte, _ := ioutil.ReadFile(CAFile)
|
||||
|
||||
block, pemByte := pem.Decode(pemByte)
|
||||
|
@ -584,7 +581,7 @@ func createCertPool(CAFile string) *x509.CertPool {
|
|||
|
||||
certPool.AddCert(cert)
|
||||
|
||||
return certPool
|
||||
return tls.RequireAndVerifyClientCert, certPool
|
||||
}
|
||||
|
||||
// Send join requests to the leader.
|
||||
|
@ -593,9 +590,8 @@ func joinCluster(s *raft.Server, serverName string) error {
|
|||
|
||||
command := &JoinCommand{
|
||||
Name: s.Name(),
|
||||
Hostname: info.Hostname,
|
||||
RaftPort: info.RaftPort,
|
||||
ClientPort: info.ClientPort,
|
||||
RaftURL: info.RaftURL,
|
||||
ClientURL: info.ClientURL,
|
||||
}
|
||||
|
||||
json.NewEncoder(&b).Encode(command)
|
||||
|
|
|
@ -36,7 +36,7 @@ func TestKillLeader(t *testing.T) {
|
|||
|
||||
leader := "127.0.0.1:7001"
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
for i := 0; i < clusterSize; i++ {
|
||||
port, _ := strconv.Atoi(strings.Split(leader, ":")[1])
|
||||
num := port - 7001
|
||||
fmt.Println("kill server ", num)
|
||||
|
|
12
machines.go
12
machines.go
|
@ -1,20 +1,20 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"strings"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
func getClientAddr(name string) (string, bool) {
|
||||
response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
|
||||
|
||||
values := strings.Split(response[0].Value, ",")
|
||||
m, err := url.ParseQuery(response[0].Value)
|
||||
|
||||
hostname := values[0]
|
||||
clientPort := values[2]
|
||||
if err != nil {
|
||||
panic("Failed to parse machines entry")
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("%s:%s", hostname, clientPort)
|
||||
addr := m["client"][0]
|
||||
|
||||
return addr, true
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"encoding/json"
|
||||
"github.com/coreos/go-raft"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
//-------------------------------------------------------------
|
||||
|
@ -91,7 +90,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
|
|||
func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name())
|
||||
w.WriteHeader(http.StatusOK)
|
||||
client := argInfo.Hostname + ":" + strconv.Itoa(argInfo.ClientPort)
|
||||
client := argInfo.ClientURL
|
||||
w.Write([]byte(client))
|
||||
}
|
||||
|
||||
|
@ -108,3 +107,16 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
|
|||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Response to the join request
|
||||
func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
command := &JoinCommand{}
|
||||
|
||||
if err := decodeJsonRequest(req, command); err == nil {
|
||||
debugf("Receive Join Request from %s", command.Name)
|
||||
dispatch(command, &w, req, false)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
30
test.go
30
test.go
|
@ -9,6 +9,7 @@ import (
|
|||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
var client = http.Client{
|
||||
|
@ -59,10 +60,10 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process,
|
|||
argGroup := make([][]string, size)
|
||||
for i := 0; i < size; i++ {
|
||||
if i == 0 {
|
||||
argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"}
|
||||
argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"}
|
||||
} else {
|
||||
strI := strconv.Itoa(i + 1)
|
||||
argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
|
||||
argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,7 +104,7 @@ func destroyCluster(etcds []*os.Process) error {
|
|||
//
|
||||
func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
|
||||
leaderMap := make(map[int]string)
|
||||
baseAddrFormat := "http://0.0.0.0:400%d/leader"
|
||||
baseAddrFormat := "http://0.0.0.0:400%d"
|
||||
|
||||
for {
|
||||
knownLeader := "unknown"
|
||||
|
@ -151,7 +152,7 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
|
|||
|
||||
func getLeader(addr string) (string, error) {
|
||||
|
||||
resp, err := client.Get(addr)
|
||||
resp, err := client.Get(addr + "/leader")
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -163,14 +164,31 @@ func getLeader(addr string) (string, error) {
|
|||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
c := etcd.NewClient()
|
||||
path := "/_etcd/machines/" + string(b)
|
||||
fmt.Println(path)
|
||||
fmt.Println(addr)
|
||||
response, err := c.GetFrom(path, addr)
|
||||
fmt.Println(response)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
m, err := url.ParseQuery(response[0].Value)
|
||||
|
||||
if err != nil {
|
||||
panic("Failed to parse machines entry")
|
||||
}
|
||||
|
||||
addr = m["server"][0]
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(b), nil
|
||||
return addr, nil
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -139,7 +139,6 @@ func (c *Client) internalSyncCluster(machines []string) bool {
|
|||
// serverName should contain both hostName and port
|
||||
func (c *Client) createHttpPath(serverName string, _path string) string {
|
||||
httpPath := path.Join(serverName, _path)
|
||||
httpPath = c.config.Scheme + "://" + httpPath
|
||||
return httpPath
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,19 @@ type transporter struct {
|
|||
client *http.Client
|
||||
// scheme
|
||||
scheme string
|
||||
names map[string]*JoinCommand
|
||||
}
|
||||
|
||||
func (t transporter) NameToRaftURL(name string) string {
|
||||
return t.names[name].RaftURL
|
||||
}
|
||||
|
||||
func (t transporter) NameToClientURL(name string) string {
|
||||
return t.names[name].ClientURL
|
||||
}
|
||||
|
||||
func (t transporter) AddPeer(jc *JoinCommand) {
|
||||
t.names[jc.Name] = jc
|
||||
}
|
||||
|
||||
// Sends AppendEntries RPCs to a peer when the server is the leader.
|
||||
|
@ -23,12 +36,13 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
|
|||
var b bytes.Buffer
|
||||
json.NewEncoder(&b).Encode(req)
|
||||
|
||||
debugf("Send LogEntries to %s ", peer.Name())
|
||||
u := t.NameToRaftURL(peer.Name())
|
||||
debugf("Send LogEntries to %s ", u)
|
||||
|
||||
resp, err := t.Post(fmt.Sprintf("%s/log/append", peer.Name()), &b)
|
||||
resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
|
||||
|
||||
if err != nil {
|
||||
debugf("Cannot send AppendEntriesRequest to %s : %s", peer.Name(), err)
|
||||
debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
|
@ -48,12 +62,13 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
|
|||
var b bytes.Buffer
|
||||
json.NewEncoder(&b).Encode(req)
|
||||
|
||||
debugf("Send Vote to %s", peer.Name())
|
||||
u := t.NameToRaftURL(peer.Name())
|
||||
debugf("Send Vote to %s", u)
|
||||
|
||||
resp, err := t.Post(fmt.Sprintf("%s/vote", peer.Name()), &b)
|
||||
resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
|
||||
|
||||
if err != nil {
|
||||
debugf("Cannot send VoteRequest to %s : %s", peer.Name(), err)
|
||||
debugf("Cannot send VoteRequest to %s : %s", u, err)
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
|
@ -73,10 +88,11 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
|
|||
var b bytes.Buffer
|
||||
json.NewEncoder(&b).Encode(req)
|
||||
|
||||
debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
|
||||
u := t.NameToRaftURL(peer.Name())
|
||||
debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
|
||||
req.LastTerm, req.LastIndex)
|
||||
|
||||
resp, err := t.Post(fmt.Sprintf("%s/snapshot", peer.Name()), &b)
|
||||
resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
|
||||
|
||||
if resp != nil {
|
||||
defer resp.Body.Close()
|
||||
|
@ -95,10 +111,11 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
|
|||
var b bytes.Buffer
|
||||
json.NewEncoder(&b).Encode(req)
|
||||
|
||||
debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(),
|
||||
u := t.NameToRaftURL(peer.Name())
|
||||
debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
|
||||
req.LastTerm, req.LastIndex)
|
||||
|
||||
resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b)
|
||||
resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
|
||||
|
||||
if resp != nil {
|
||||
defer resp.Body.Close()
|
||||
|
@ -123,12 +140,12 @@ func (t transporter) GetLeaderClientAddress() string {
|
|||
|
||||
// Send server side POST request
|
||||
func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
|
||||
resp, err := t.client.Post(t.scheme+path, "application/json", body)
|
||||
resp, err := t.client.Post(path, "application/json", body)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Send server side GET request
|
||||
func (t transporter) Get(path string) (*http.Response, error) {
|
||||
resp, err := t.client.Get(t.scheme + path)
|
||||
resp, err := t.client.Get(path)
|
||||
return resp, err
|
||||
}
|
||||
|
|
|
@ -24,7 +24,8 @@ func mainHandler(c http.ResponseWriter, req *http.Request) {
|
|||
mainTempl.Execute(c, p)
|
||||
}
|
||||
|
||||
func Start(server *raft.Server, port int) {
|
||||
func Start(server *raft.Server, webURL string) {
|
||||
port := "4002"
|
||||
mainTempl = template.Must(template.New("index.html").Parse(index_html))
|
||||
s = server
|
||||
|
||||
|
|
Loading…
Reference in New Issue