etcdserver: introduce the cluster and member

This introduces two new concepts: the cluster and the member.

Members are logical etcd instances that have a name, raft ID, and a list
of peer and client addresses.

A cluster is made up of a list of members.
release-2.0
Brandon Philips 2014-09-24 19:51:27 -07:00
parent 56c64ab2e8
commit 5e3fd6ee3f
13 changed files with 654 additions and 477 deletions

View File

@ -1,5 +1,5 @@
# Use goreman to run `go get github.com/mattn/goreman`
etcd1: bin/etcd -id 0x1 -bind-addr 127.0.0.1:4001 -peer-bind-addr :7001 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
etcd2: bin/etcd -id 0x2 -bind-addr 127.0.0.1:4002 -peer-bind-addr :7002 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
etcd3: bin/etcd -id 0x3 -bind-addr 127.0.0.1:4003 -peer-bind-addr :7003 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
etcd1: bin/etcd -name node1 -bind-addr 127.0.0.1:4001 -peer-bind-addr :7001 -bootstrap-config 'node1=localhost:7001,node2=localhost:7002,node3=localhost:7003'
etcd2: bin/etcd -name node2 -bind-addr 127.0.0.1:4002 -peer-bind-addr :7002 -bootstrap-config 'node1=localhost:7001,node2=localhost:7002,node3=localhost:7003'
etcd3: bin/etcd -name node3 -bind-addr 127.0.0.1:4003 -peer-bind-addr :7003 -bootstrap-config 'node1=localhost:7001,node2=localhost:7002,node3=localhost:7003'
#proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -peers 'localhost:7001,localhost:7002,localhost:7003'

114
etcdserver/cluster.go Normal file
View File

@ -0,0 +1,114 @@
package etcdserver
import (
"fmt"
"math/rand"
"net/url"
"sort"
"strings"
)
// Cluster is a list of Members that belong to the same raft cluster
type Cluster map[int64]*Member
func (c Cluster) FindID(id int64) *Member {
return c[id]
}
func (c Cluster) FindName(name string) *Member {
for _, m := range c {
if m.Name == name {
return m
}
}
return nil
}
func (c Cluster) Add(m Member) error {
if c.FindID(m.ID) != nil {
return fmt.Errorf("Member exists with identical ID %v", m)
}
c[m.ID] = &m
return nil
}
func (c *Cluster) AddSlice(mems []Member) error {
for _, m := range mems {
err := c.Add(m)
if err != nil {
return err
}
}
return nil
}
// Pick chooses a random address from a given Member's addresses, and returns it as
// an addressible URI. If the given member does not exist, an empty string is returned.
func (c Cluster) Pick(id int64) string {
if m := c.FindID(id); m != nil {
addrs := m.PeerURLs
if len(addrs) == 0 {
return ""
}
return addrs[rand.Intn(len(addrs))]
}
return ""
}
// Set parses command line sets of names to IPs formatted like:
// mach0=1.1.1.1,mach0=2.2.2.2,mach0=1.1.1.1,mach1=2.2.2.2,mach1=3.3.3.3
func (c *Cluster) Set(s string) error {
*c = Cluster{}
v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
if err != nil {
return err
}
for name, urls := range v {
if len(urls) == 0 || urls[0] == "" {
return fmt.Errorf("Empty URL given for %q", name)
}
m := newMember(name, urls)
err := c.Add(*m)
if err != nil {
return err
}
}
return nil
}
func (c Cluster) String() string {
sl := []string{}
for _, m := range c {
for _, u := range m.PeerURLs {
sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
}
}
sort.Strings(sl)
return strings.Join(sl, ",")
}
func (c Cluster) IDs() []int64 {
var ids []int64
for _, m := range c {
ids = append(ids, m.ID)
}
return ids
}
// Endpoints returns a list of all peer addresses. Each address is prefixed
// with the scheme (currently "http://"). The returned list is sorted in
// ascending lexicographical order.
func (c Cluster) Endpoints() []string {
endpoints := make([]string, 0)
for _, p := range c {
for _, addr := range p.PeerURLs {
endpoints = append(endpoints, addScheme(addr))
}
}
sort.Strings(endpoints)
return endpoints
}

141
etcdserver/cluster_store.go Normal file
View File

@ -0,0 +1,141 @@
package etcdserver
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
const (
raftPrefix = "/raft"
)
type ClusterStore interface {
Get() Cluster
Delete(id int64)
}
type clusterStore struct {
Store store.Store
}
func NewClusterStore(st store.Store, c Cluster) ClusterStore {
cls := &clusterStore{Store: st}
for _, m := range c {
cls.add(*m)
}
return cls
}
// add puts a new Member into the store.
// A Member with a matching id must not exist.
func (s *clusterStore) add(m Member) {
b, err := json.Marshal(m)
if err != nil {
log.Panicf("marshal peer info error: %v", err)
}
if _, err := s.Store.Create(m.storeKey(), false, string(b), false, store.Permanent); err != nil {
log.Panicf("add member should never fail: %v", err)
}
}
// TODO(philips): keep the latest copy without going to the store to avoid the
// lock here.
func (s *clusterStore) Get() Cluster {
c := &Cluster{}
e, err := s.Store.Get(machineKVPrefix, true, false)
if err != nil {
log.Panicf("get member should never fail: %v", err)
}
for _, n := range e.Node.Nodes {
m := Member{}
if err := json.Unmarshal([]byte(*n.Value), &m); err != nil {
log.Panicf("unmarshal peer error: %v", err)
}
err := c.Add(m)
if err != nil {
log.Panicf("add member to cluster should never fail: %v", err)
}
}
return *c
}
// Delete removes a member from the store.
// The given id MUST exist.
func (s *clusterStore) Delete(id int64) {
p := s.Get().FindID(id).storeKey()
if _, err := s.Store.Delete(p, false, false); err != nil {
log.Panicf("delete peer should never fail: %v", err)
}
}
// addScheme adds the protocol prefix to a string; currently only HTTP
// TODO: improve this when implementing TLS
func addScheme(addr string) string {
return fmt.Sprintf("http://%s", addr)
}
func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
c := &http.Client{Transport: t}
scheme := "http"
if t.TLSClientConfig != nil {
scheme = "https"
}
return func(msgs []raftpb.Message) {
for _, m := range msgs {
// TODO: reuse go routines
// limit the number of outgoing connections for the same receiver
go send(c, scheme, cls, m)
}
}
}
func send(c *http.Client, scheme string, cls ClusterStore, m raftpb.Message) {
// TODO (xiangli): reasonable retry logic
for i := 0; i < 3; i++ {
addr := cls.Get().Pick(m.To)
if addr == "" {
// TODO: unknown peer id.. what do we do? I
// don't think his should ever happen, need to
// look into this further.
log.Printf("etcdhttp: no addr for %d", m.To)
return
}
url := fmt.Sprintf("%s://%s%s", scheme, addr, raftPrefix)
// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data, err := m.Marshal()
if err != nil {
log.Println("etcdhttp: dropping message:", err)
return // drop bad message
}
if httpPost(c, url, data) {
return // success
}
// TODO: backoff
}
}
func httpPost(c *http.Client, url string, data []byte) bool {
resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
if err != nil {
// TODO: log the error?
return false
}
resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
// TODO: log the error?
return false
}
return true
}

View File

@ -0,0 +1,115 @@
package etcdserver
import (
"reflect"
"testing"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
)
func TestClusterStoreGet(t *testing.T) {
tests := []struct {
mems []Member
wmems []Member
}{
{
[]Member{{Name: "node1", ID: 1}},
[]Member{{Name: "node1", ID: 1}},
},
{
[]Member{},
[]Member{},
},
{
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
},
{
[]Member{{Name: "node2", ID: 2}, {Name: "node1", ID: 1}},
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
},
}
for i, tt := range tests {
c := Cluster{}
err := c.AddSlice(tt.mems)
if err != nil {
t.Error(err)
}
cs := NewClusterStore(&getAllStore{}, c)
if g := cs.Get(); !reflect.DeepEqual(g, c) {
t.Errorf("#%d: mems = %v, want %v", i, g, c)
}
}
}
func TestClusterStoreDelete(t *testing.T) {
st := &storeGetAllDeleteRecorder{}
c := Cluster{}
c.Add(Member{Name: "node", ID: 1})
cs := NewClusterStore(st, c)
cs.Delete(1)
wdeletes := []string{machineKVPrefix + "1"}
if !reflect.DeepEqual(st.deletes, wdeletes) {
t.Error("deletes = %v, want %v", st.deletes, wdeletes)
}
}
// simpleStore implements basic create and get.
type simpleStore struct {
storeRecorder
st map[string]string
}
func (s *simpleStore) Create(key string, _ bool, value string, _ bool, _ time.Time) (*store.Event, error) {
if s.st == nil {
s.st = make(map[string]string)
}
s.st[key] = value
return nil, nil
}
func (s *simpleStore) Get(key string, _, _ bool) (*store.Event, error) {
val, ok := s.st[key]
if !ok {
return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)
}
ev := &store.Event{Node: &store.NodeExtern{Key: key, Value: stringp(val)}}
return ev, nil
}
// getAllStore inherits simpleStore, and makes Get return all keys.
type getAllStore struct {
simpleStore
}
func (s *getAllStore) Get(_ string, _, _ bool) (*store.Event, error) {
nodes := make([]*store.NodeExtern, 0)
for k, v := range s.st {
nodes = append(nodes, &store.NodeExtern{Key: k, Value: stringp(v)})
}
return &store.Event{Node: &store.NodeExtern{Nodes: nodes}}, nil
}
type storeDeleteRecorder struct {
storeRecorder
deletes []string
}
func (s *storeDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) {
s.deletes = append(s.deletes, key)
return nil, nil
}
type storeGetAllDeleteRecorder struct {
getAllStore
deletes []string
}
func (s *storeGetAllDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) {
s.deletes = append(s.deletes, key)
return nil, nil
}

143
etcdserver/cluster_test.go Normal file
View File

@ -0,0 +1,143 @@
package etcdserver
import (
"testing"
)
func TestClusterFind(t *testing.T) {
tests := []struct {
id int64
name string
mems []Member
match bool
}{
{
1,
"node1",
[]Member{{Name: "node1", ID: 1}},
true,
},
{
2,
"foobar",
[]Member{},
false,
},
{
2,
"node2",
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
true,
},
{
3,
"node3",
[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
false,
},
}
for i, tt := range tests {
c := Cluster{}
c.AddSlice(tt.mems)
m := c.FindName(tt.name)
if m == nil && !tt.match {
continue
}
if m == nil && tt.match {
t.Errorf("#%d: expected match got empty", i)
}
if m.Name != tt.name && tt.match {
t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.name)
}
}
for i, tt := range tests {
c := Cluster{}
c.AddSlice(tt.mems)
m := c.FindID(tt.id)
if m == nil && !tt.match {
continue
}
if m == nil && tt.match {
t.Errorf("#%d: expected match got empty", i)
}
if m.ID != tt.id && tt.match {
t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.id)
}
}
}
func TestClusterSet(t *testing.T) {
tests := []struct {
f string
mems []Member
parse bool
}{
{
"mem1=10.0.0.1:2379,mem1=128.193.4.20:2379,mem2=10.0.0.2:2379,default=127.0.0.1:2379",
[]Member{
{ID: 3736794188555456841, Name: "mem1", PeerURLs: []string{"10.0.0.1:2379", "128.193.4.20:2379"}},
{ID: 5674507346857578431, Name: "mem2", PeerURLs: []string{"10.0.0.2:2379"}},
{ID: 2676999861503984872, Name: "default", PeerURLs: []string{"127.0.0.1:2379"}},
},
true,
},
}
for i, tt := range tests {
c := Cluster{}
err := c.AddSlice(tt.mems)
if err != nil {
t.Error(err)
}
g := Cluster{}
g.Set(tt.f)
if g.String() != c.String() {
t.Errorf("#%d: set = %v, want %v", i, g, c)
}
}
}
func TestClusterSetBad(t *testing.T) {
tests := []string{
"mem1=,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379",
"mem1,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379",
// TODO(philips): anyone know of a 64 bit sha1 hash collision
// "06b2f82fd81b2c20=128.193.4.20:2379,02c60cb75083ceef=128.193.4.20:2379",
}
for i, tt := range tests {
g := Cluster{}
err := g.Set(tt)
if err == nil {
t.Errorf("#%d: set = %v, want err", i, tt)
}
}
}
func TestClusterAddBad(t *testing.T) {
tests := []struct {
mems []Member
}{
{
[]Member{
{ID: 1, Name: "mem1"},
{ID: 1, Name: "mem2"},
},
},
}
c := &Cluster{}
c.Add(Member{ID: 1, Name: "mem1"})
for i, tt := range tests {
for _, m := range tt.mems {
err := c.Add(m)
if err == nil {
t.Errorf("#%d: set = %v, want err", i, m)
}
}
}
}

View File

@ -35,12 +35,12 @@ const (
var errClosed = errors.New("etcdhttp: client closed connection")
// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
func NewClientHandler(server *etcdserver.EtcdServer, peers Peers, timeout time.Duration) http.Handler {
func NewClientHandler(server *etcdserver.EtcdServer, clusterStore etcdserver.ClusterStore, timeout time.Duration) http.Handler {
sh := &serverHandler{
server: server,
peers: peers,
timer: server,
timeout: timeout,
server: server,
clusterStore: clusterStore,
timer: server,
timeout: timeout,
}
if sh.timeout == 0 {
sh.timeout = defaultServerTimeout
@ -68,10 +68,10 @@ func NewPeerHandler(server etcdserver.Server) http.Handler {
// serverHandler provides http.Handlers for etcd client and raft communication.
type serverHandler struct {
timeout time.Duration
server etcdserver.Server
timer etcdserver.RaftTimer
peers Peers
timeout time.Duration
server etcdserver.Server
timer etcdserver.RaftTimer
clusterStore etcdserver.ClusterStore
}
func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
@ -115,7 +115,7 @@ func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET", "HEAD") {
return
}
endpoints := h.peers.Endpoints()
endpoints := h.clusterStore.Get().Endpoints()
w.Write([]byte(strings.Join(endpoints, ", ")))
}

View File

@ -589,7 +589,7 @@ func TestV2MachinesEndpoint(t *testing.T) {
{"POST", http.StatusMethodNotAllowed},
}
m := NewClientHandler(nil, Peers{}, time.Hour)
m := NewClientHandler(nil, &fakeCluster{}, time.Hour)
s := httptest.NewServer(m)
defer s.Close()
@ -610,15 +610,20 @@ func TestV2MachinesEndpoint(t *testing.T) {
}
func TestServeMachines(t *testing.T) {
peers := Peers{}
peers.Set("0xBEEF0=localhost:8080&0xBEEF1=localhost:8081&0xBEEF2=localhost:8082")
cluster := &fakeCluster{
members: []etcdserver.Member{
{ID: 0xBEEF0, PeerURLs: []string{"localhost:8080"}},
{ID: 0xBEEF1, PeerURLs: []string{"localhost:8081"}},
{ID: 0xBEEF2, PeerURLs: []string{"localhost:8082"}},
},
}
writer := httptest.NewRecorder()
req, err := http.NewRequest("GET", "", nil)
if err != nil {
t.Fatal(err)
}
h := &serverHandler{peers: peers}
h := &serverHandler{clusterStore: cluster}
h.serveMachines(writer, req)
w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
if g := writer.Body.String(); g != w {
@ -629,56 +634,64 @@ func TestServeMachines(t *testing.T) {
}
}
func TestPeersEndpoints(t *testing.T) {
func TestClusterGetEndpoints(t *testing.T) {
tests := []struct {
peers Peers
endpoints []string
clusterStore etcdserver.ClusterStore
endpoints []string
}{
// single peer with a single address
{
peers: Peers(map[int64][]string{
1: []string{"192.0.2.1"},
}),
clusterStore: &fakeCluster{
members: []etcdserver.Member{
{ID: 1, PeerURLs: []string{"192.0.2.1"}},
},
},
endpoints: []string{"http://192.0.2.1"},
},
// single peer with a single address with a port
{
peers: Peers(map[int64][]string{
1: []string{"192.0.2.1:8001"},
}),
clusterStore: &fakeCluster{
members: []etcdserver.Member{
{ID: 1, PeerURLs: []string{"192.0.2.1:8001"}},
},
},
endpoints: []string{"http://192.0.2.1:8001"},
},
// several peers explicitly unsorted
// several members explicitly unsorted
{
peers: Peers(map[int64][]string{
2: []string{"192.0.2.3", "192.0.2.4"},
3: []string{"192.0.2.5", "192.0.2.6"},
1: []string{"192.0.2.1", "192.0.2.2"},
}),
clusterStore: &fakeCluster{
members: []etcdserver.Member{
{ID: 2, PeerURLs: []string{"192.0.2.3", "192.0.2.4"}},
{ID: 3, PeerURLs: []string{"192.0.2.5", "192.0.2.6"}},
{ID: 1, PeerURLs: []string{"192.0.2.1", "192.0.2.2"}},
},
},
endpoints: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"},
},
// no peers
// no members
{
peers: Peers(map[int64][]string{}),
endpoints: []string{},
clusterStore: &fakeCluster{members: []etcdserver.Member{}},
endpoints: []string{},
},
// peer with no endpoints
{
peers: Peers(map[int64][]string{
3: []string{},
}),
clusterStore: &fakeCluster{
members: []etcdserver.Member{
{ID: 3, PeerURLs: []string{}},
},
},
endpoints: []string{},
},
}
for i, tt := range tests {
endpoints := tt.peers.Endpoints()
endpoints := tt.clusterStore.Get().Endpoints()
if !reflect.DeepEqual(tt.endpoints, endpoints) {
t.Errorf("#%d: peers.Endpoints() incorrect: want=%#v got=%#v", i, tt.endpoints, endpoints)
t.Errorf("#%d: members.Endpoints() incorrect: want=%#v got=%#v", i, tt.endpoints, endpoints)
}
}
}
@ -868,7 +881,6 @@ func TestServeRaft(t *testing.T) {
h := &serverHandler{
timeout: time.Hour,
server: &errServer{tt.serverErr},
peers: nil,
}
rw := httptest.NewRecorder()
h.serveRaft(rw, req)
@ -957,7 +969,6 @@ func TestBadServeKeys(t *testing.T) {
h := &serverHandler{
timeout: 0, // context times out immediately
server: tt.server,
peers: nil,
}
rw := httptest.NewRecorder()
h.serveKeys(rw, tt.req)
@ -980,7 +991,6 @@ func TestServeKeysEvent(t *testing.T) {
h := &serverHandler{
timeout: time.Hour,
server: server,
peers: nil,
timer: &dummyRaftTimer{},
}
rw := httptest.NewRecorder()
@ -1019,7 +1029,6 @@ func TestServeKeysWatch(t *testing.T) {
h := &serverHandler{
timeout: time.Hour,
server: server,
peers: nil,
timer: &dummyRaftTimer{},
}
go func() {
@ -1295,3 +1304,15 @@ func TestHandleWatchStreaming(t *testing.T) {
t.Fatalf("timed out waiting for done")
}
}
type fakeCluster struct {
members []etcdserver.Member
}
func (c *fakeCluster) Get() etcdserver.Cluster {
cl := &etcdserver.Cluster{}
cl.AddSlice(c.members)
return *cl
}
func (c *fakeCluster) Delete(id int64) { return }

View File

@ -1,157 +0,0 @@
package etcdhttp
import (
"bytes"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"sort"
"strconv"
"github.com/coreos/etcd/raft/raftpb"
)
// Peers contains a mapping of unique IDs to a list of hostnames/IP addresses
type Peers map[int64][]string
// addScheme adds the protocol prefix to a string; currently only HTTP
// TODO: improve this when implementing TLS
func addScheme(addr string) string {
return fmt.Sprintf("http://%s", addr)
}
// Pick returns a random address from a given Peer's addresses. If the
// given peer does not exist, an empty string is returned.
func (ps Peers) Pick(id int64) string {
addrs := ps[id]
if len(addrs) == 0 {
return ""
}
return addrs[rand.Intn(len(addrs))]
}
// Set parses command line sets of names to IPs formatted like:
// a=1.1.1.1&a=1.1.1.2&b=2.2.2.2
func (ps *Peers) Set(s string) error {
m := make(map[int64][]string)
v, err := url.ParseQuery(s)
if err != nil {
return err
}
for k, v := range v {
id, err := strconv.ParseInt(k, 0, 64)
if err != nil {
return err
}
m[id] = v
}
*ps = m
return nil
}
func (ps *Peers) String() string {
v := url.Values{}
for k, vv := range *ps {
for i := range vv {
v.Add(strconv.FormatInt(k, 16), vv[i])
}
}
return v.Encode()
}
func (ps Peers) IDs() []int64 {
var ids []int64
for id := range ps {
ids = append(ids, id)
}
return ids
}
// Endpoints returns a list of all peer addresses. Each address is prefixed
// with the scheme (currently "http://"). The returned list is sorted in
// ascending lexicographical order.
func (ps Peers) Endpoints() []string {
endpoints := make([]string, 0)
for _, addrs := range ps {
for _, addr := range addrs {
endpoints = append(endpoints, addScheme(addr))
}
}
sort.Strings(endpoints)
return endpoints
}
// Addrs returns a list of all peer addresses. The returned list is sorted
// in ascending lexicographical order.
func (ps Peers) Addrs() []string {
addrs := make([]string, 0)
for _, paddrs := range ps {
for _, paddr := range paddrs {
addrs = append(addrs, paddr)
}
}
sort.Strings(addrs)
return addrs
}
func Sender(t *http.Transport, p Peers) func(msgs []raftpb.Message) {
c := &http.Client{Transport: t}
scheme := "http"
if t.TLSClientConfig != nil {
scheme = "https"
}
return func(msgs []raftpb.Message) {
for _, m := range msgs {
// TODO: reuse go routines
// limit the number of outgoing connections for the same receiver
go send(c, scheme, p, m)
}
}
}
func send(c *http.Client, scheme string, p Peers, m raftpb.Message) {
// TODO (xiangli): reasonable retry logic
for i := 0; i < 3; i++ {
addr := p.Pick(m.To)
if addr == "" {
// TODO: unknown peer id.. what do we do? I
// don't think his should ever happen, need to
// look into this further.
log.Printf("etcdhttp: no addr for %d", m.To)
return
}
url := fmt.Sprintf("%s://%s%s", scheme, addr, raftPrefix)
// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data, err := m.Marshal()
if err != nil {
log.Println("etcdhttp: dropping message:", err)
return // drop bad message
}
if httpPost(c, url, data) {
return // success
}
// TODO: backoff
}
}
func httpPost(c *http.Client, url string, data []byte) bool {
resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
if err != nil {
// TODO: log the error?
return false
}
resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
// TODO: log the error?
return false
}
return true
}

View File

@ -1,248 +0,0 @@
package etcdhttp
import (
"net/http"
"net/http/httptest"
"reflect"
"sort"
"strings"
"testing"
"github.com/coreos/etcd/raft/raftpb"
)
func TestPeers(t *testing.T) {
tests := []struct {
in string
wids []int64
wep []string
waddrs []string
wstring string
}{
{
"1=1.1.1.1",
[]int64{1},
[]string{"http://1.1.1.1"},
[]string{"1.1.1.1"},
"1=1.1.1.1",
},
{
"2=2.2.2.2",
[]int64{2},
[]string{"http://2.2.2.2"},
[]string{"2.2.2.2"},
"2=2.2.2.2",
},
{
"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
[]int64{1, 2},
[]string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2"},
[]string{"1.1.1.1", "1.1.1.2", "2.2.2.2"},
"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
},
{
"3=3.3.3.3&4=4.4.4.4&1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
[]int64{1, 2, 3, 4},
[]string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2",
"http://3.3.3.3", "http://4.4.4.4"},
[]string{"1.1.1.1", "1.1.1.2", "2.2.2.2", "3.3.3.3", "4.4.4.4"},
"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2&3=3.3.3.3&4=4.4.4.4",
},
}
for i, tt := range tests {
p := &Peers{}
err := p.Set(tt.in)
if err != nil {
t.Errorf("#%d: err=%v, want nil", i, err)
}
ids := int64Slice(p.IDs())
sort.Sort(ids)
if !reflect.DeepEqual([]int64(ids), tt.wids) {
t.Errorf("#%d: IDs=%#v, want %#v", i, []int64(ids), tt.wids)
}
ep := p.Endpoints()
if !reflect.DeepEqual(ep, tt.wep) {
t.Errorf("#%d: Endpoints=%#v, want %#v", i, ep, tt.wep)
}
addrs := p.Addrs()
if !reflect.DeepEqual(addrs, tt.waddrs) {
t.Errorf("#%d: addrs=%#v, want %#v", i, ep, tt.waddrs)
}
s := p.String()
if s != tt.wstring {
t.Errorf("#%d: string=%q, want %q", i, s, tt.wstring)
}
}
}
type int64Slice []int64
func (p int64Slice) Len() int { return len(p) }
func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func TestPeersSetBad(t *testing.T) {
tests := []string{
// garbage URL
"asdf%%",
// non-int64 keys
"a=1.2.3.4",
"-1-23=1.2.3.4",
}
for i, tt := range tests {
p := &Peers{}
if err := p.Set(tt); err == nil {
t.Errorf("#%d: err=nil unexpectedly", i)
}
}
}
func TestPeersPick(t *testing.T) {
ps := &Peers{
1: []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"},
2: []string{"xyz"},
3: []string{},
}
ids := map[string]bool{
"abc": true,
"def": true,
"ghi": true,
"jkl": true,
"mno": true,
"pqr": true,
"stu": true,
}
for i := 0; i < 1000; i++ {
a := ps.Pick(1)
if _, ok := ids[a]; !ok {
t.Errorf("returned ID %q not in expected range!", a)
break
}
}
if b := ps.Pick(2); b != "xyz" {
t.Errorf("id=%q, want %q", b, "xyz")
}
if c := ps.Pick(3); c != "" {
t.Errorf("id=%q, want \"\"", c)
}
}
func TestHttpPost(t *testing.T) {
var tr *http.Request
tests := []struct {
h http.HandlerFunc
w bool
}{
{
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tr = r
w.WriteHeader(http.StatusNoContent)
}),
true,
},
{
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tr = r
w.WriteHeader(http.StatusNotFound)
}),
false,
},
{
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tr = r
w.WriteHeader(http.StatusInternalServerError)
}),
false,
},
}
for i, tt := range tests {
ts := httptest.NewServer(tt.h)
if g := httpPost(http.DefaultClient, ts.URL, []byte("adsf")); g != tt.w {
t.Errorf("#%d: httpPost()=%t, want %t", i, g, tt.w)
}
if tr.Method != "POST" {
t.Errorf("#%d: Method=%q, want %q", i, tr.Method, "POST")
}
if ct := tr.Header.Get("Content-Type"); ct != "application/protobuf" {
t.Errorf("#%d: Content-Type=%q, want %q", i, ct, "application/protobuf")
}
tr = nil
ts.Close()
}
if httpPost(http.DefaultClient, "garbage url", []byte("data")) {
t.Errorf("httpPost with bad URL returned true unexpectedly!")
}
}
func TestSend(t *testing.T) {
var tr *http.Request
var rc int
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tr = r
w.WriteHeader(rc)
})
tests := []struct {
m raftpb.Message
code int
ok bool
}{
{
// all good
raftpb.Message{
To: 42,
Type: 4,
},
http.StatusNoContent,
true,
},
{
// bad response from server should be silently ignored
raftpb.Message{
To: 42,
Type: 2,
},
http.StatusInternalServerError,
true,
},
{
// unknown destination!
raftpb.Message{
To: 3,
Type: 2,
},
0,
false,
},
}
for i, tt := range tests {
tr = nil
rc = tt.code
ts := httptest.NewServer(h)
ps := Peers{
42: []string{strings.TrimPrefix(ts.URL, "http://")},
}
send(http.DefaultClient, "http", ps, tt.m)
if !tt.ok {
if tr != nil {
t.Errorf("#%d: got request=%#v, want nil", i, tr)
}
ts.Close()
continue
}
if tr.Method != "POST" {
t.Errorf("#%d: Method=%q, want %q", i, tr.Method, "POST")
}
if ct := tr.Header.Get("Content-Type"); ct != "application/protobuf" {
t.Errorf("#%d: Content-Type=%q, want %q", i, ct, "application/protobuf")
}
if tr.URL.String() != "/raft" {
t.Errorf("#%d: URL=%q, want %q", i, tr.URL.String(), "/raft")
}
ts.Close()
}
}

43
etcdserver/member.go Normal file
View File

@ -0,0 +1,43 @@
package etcdserver
import (
"crypto/sha1"
"encoding/binary"
"path"
"sort"
"strconv"
)
const machineKVPrefix = "/_etcd/machines/"
type Member struct {
ID int64
Name string
// TODO(philips): ensure these are URLs
PeerURLs []string
ClientURLs []string
}
// newMember creates a Member without an ID and generates one based on the
// name, peer URLs. This is used for bootstrapping.
func newMember(name string, peerURLs []string) *Member {
sort.Strings(peerURLs)
m := &Member{Name: name, PeerURLs: peerURLs}
b := []byte(m.Name)
for _, p := range m.PeerURLs {
b = append(b, []byte(p)...)
}
hash := sha1.Sum(b)
m.ID = int64(binary.BigEndian.Uint64(hash[:8]))
if m.ID < 0 {
m.ID = m.ID * -1
}
return m
}
func (m Member) storeKey() string {
return path.Join(machineKVPrefix, strconv.FormatUint(uint64(m.ID), 16))
}

View File

@ -80,7 +80,7 @@ type EtcdServer struct {
Node raft.Node
Store store.Store
// Send specifies the send function for sending msgs to peers. Send
// Send specifies the send function for sending msgs to members. Send
// MUST NOT block. It is okay to drop messages, since clients should
// timeout and reissue their messages. If Send is nil, server will
// panic.
@ -94,8 +94,9 @@ type EtcdServer struct {
SnapCount int64 // number of entries to trigger a snapshot
// Cache of the latest raft index and raft term the server has seen
raftIndex int64
raftTerm int64
raftIndex int64
raftTerm int64
ClusterStore ClusterStore
}
// Start prepares and starts server in a new goroutine. It is no longer safe to
@ -107,6 +108,8 @@ func (s *EtcdServer) Start() {
}
s.w = wait.New()
s.done = make(chan struct{})
// TODO: if this is an empty log, writes all peer infos
// into the first entry
go s.run()
}
@ -130,6 +133,7 @@ func (s *EtcdServer) run() {
// TODO(bmizerany): do this in the background, but take
// care to apply entries in a single goroutine, and not
// race them.
// TODO: apply configuration change into ClusterStore.
for _, e := range rd.CommittedEntries {
switch e.Type {
case raftpb.EntryNormal:

View File

@ -382,14 +382,14 @@ func testServer(t *testing.T, ns int64) {
}
}
peers := make([]int64, ns)
members := make([]int64, ns)
for i := int64(0); i < ns; i++ {
peers[i] = i + 1
members[i] = i + 1
}
for i := int64(0); i < ns; i++ {
id := i + 1
n := raft.StartNode(id, peers, 10, 1)
n := raft.StartNode(id, members, 10, 1)
tk := time.NewTicker(10 * time.Millisecond)
defer tk.Stop()
srv := &EtcdServer{

43
main.go
View File

@ -36,14 +36,14 @@ const (
)
var (
fid = flag.String("id", "0x1", "ID of this server")
name = flag.String("name", "default", "Unique human-readable name for this node")
timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout")
paddr = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')")
dir = flag.String("data-dir", "", "Path to the data directory")
snapCount = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
printVersion = flag.Bool("version", false, "Print the version and exit")
peers = &etcdhttp.Peers{}
cluster = &etcdserver.Cluster{}
addrs = &Addrs{}
cors = &pkg.CORSInfo{}
proxyFlag = new(ProxyFlag)
@ -78,11 +78,11 @@ var (
)
func init() {
flag.Var(peers, "peers", "your peers")
flag.Var(cluster, "bootstrap-config", "Initial cluster configuration for bootstrapping")
flag.Var(addrs, "bind-addr", "List of HTTP service addresses (e.g., '127.0.0.1:4001,10.0.0.1:8080')")
flag.Var(cors, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
flag.Var(proxyFlag, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(proxyFlagValues, ", ")))
peers.Set("0x1=localhost:8080")
cluster.Set("default=localhost:8080")
addrs.Set("127.0.0.1:4001")
proxyFlag.Set(proxyFlagValueOff)
@ -122,16 +122,13 @@ func main() {
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
func startEtcd() {
id, err := strconv.ParseInt(*fid, 0, 64)
if err != nil {
log.Fatal(err)
}
if id == raft.None {
log.Fatalf("etcd: cannot use None(%d) as etcdserver id", raft.None)
self := cluster.FindName(*name)
if self == nil {
log.Fatalf("etcd: no member with name=%q exists", *name)
}
if peers.Pick(id) == "" {
log.Fatalf("%#x=<addr> must be specified in peers", id)
if self.ID == raft.None {
log.Fatalf("etcd: cannot use None(%d) as member id", raft.None)
}
if *snapCount <= 0 {
@ -139,7 +136,7 @@ func startEtcd() {
}
if *dir == "" {
*dir = fmt.Sprintf("%v_etcd_data", *fid)
*dir = fmt.Sprintf("%v_etcd_data", self.ID)
log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir)
}
if err := os.MkdirAll(*dir, privateDirMode); err != nil {
@ -154,6 +151,7 @@ func startEtcd() {
waldir := path.Join(*dir, "wal")
var w *wal.WAL
var n raft.Node
var err error
st := store.New()
if !wal.Exist(waldir) {
@ -161,7 +159,7 @@ func startEtcd() {
if err != nil {
log.Fatal(err)
}
n = raft.StartNode(id, peers.IDs(), 10, 1)
n = raft.StartNode(self.ID, cluster.IDs(), 10, 1)
} else {
var index int64
snapshot, err := snapshotter.Load()
@ -186,7 +184,7 @@ func startEtcd() {
if wid != 0 {
log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
}
n = raft.RestartNode(id, peers.IDs(), 10, 1, snapshot, st, ents)
n = raft.RestartNode(self.ID, cluster.IDs(), 10, 1, snapshot, st, ents)
}
pt, err := transport.NewTransport(peerTLSInfo)
@ -194,6 +192,8 @@ func startEtcd() {
log.Fatal(err)
}
cls := etcdserver.NewClusterStore(st, *cluster)
s := &etcdserver.EtcdServer{
Store: st,
Node: n,
@ -201,15 +201,16 @@ func startEtcd() {
*wal.WAL
*snap.Snapshotter
}{w, snapshotter},
Send: etcdhttp.Sender(pt, *peers),
Ticker: time.Tick(100 * time.Millisecond),
SyncTicker: time.Tick(500 * time.Millisecond),
SnapCount: *snapCount,
Send: etcdserver.Sender(pt, cls),
Ticker: time.Tick(100 * time.Millisecond),
SyncTicker: time.Tick(500 * time.Millisecond),
SnapCount: *snapCount,
ClusterStore: cls,
}
s.Start()
ch := &pkg.CORSHandler{
Handler: etcdhttp.NewClientHandler(s, *peers, *timeout),
Handler: etcdhttp.NewClientHandler(s, cls, *timeout),
Info: cors,
}
ph := etcdhttp.NewPeerHandler(s)
@ -247,7 +248,7 @@ func startProxy() {
log.Fatal(err)
}
ph, err := proxy.NewHandler(pt, (*peers).Addrs())
ph, err := proxy.NewHandler(pt, (*cluster).Endpoints())
if err != nil {
log.Fatal(err)
}