etcdserver/etcdhttp: split out peers and add tests

release-2.0
Jonathan Boulle 2014-09-11 16:13:45 -07:00
parent c03798f99b
commit 5f66b35852
3 changed files with 295 additions and 135 deletions

View File

@ -1,7 +1,6 @@
package etcdhttp
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
@ -11,13 +10,11 @@ import (
"log"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
crand "crypto/rand"
"math/rand"
"github.com/coreos/etcd/elog"
etcdErr "github.com/coreos/etcd/error"
@ -31,131 +28,12 @@ import (
const (
keysPrefix = "/v2/keys"
machinesPrefix = "/v2/machines"
DefaultTimeout = 500 * time.Millisecond
)
type Peers map[int64][]string
func (ps Peers) Pick(id int64) string {
addrs := ps[id]
if len(addrs) == 0 {
return ""
}
return addScheme(addrs[rand.Intn(len(addrs))])
}
// TODO: improve this when implementing TLS
func addScheme(addr string) string {
return fmt.Sprintf("http://%s", addr)
}
// Set parses command line sets of names to ips formatted like:
// a=1.1.1.1&a=1.1.1.2&b=2.2.2.2
func (ps *Peers) Set(s string) error {
m := make(map[int64][]string)
v, err := url.ParseQuery(s)
if err != nil {
return err
}
for k, v := range v {
id, err := strconv.ParseInt(k, 0, 64)
if err != nil {
return err
}
m[id] = v
}
*ps = m
return nil
}
func (ps *Peers) String() string {
v := url.Values{}
for k, vv := range *ps {
for i := range vv {
v.Add(strconv.FormatInt(k, 16), vv[i])
}
}
return v.Encode()
}
func (ps Peers) IDs() []int64 {
var ids []int64
for id := range ps {
ids = append(ids, id)
}
return ids
}
// Endpoints returns a list of all peer addresses. Each address is
// prefixed with "http://". The returned list is sorted (asc).
func (ps Peers) Endpoints() []string {
endpoints := make([]string, 0)
for _, addrs := range ps {
for _, addr := range addrs {
endpoints = append(endpoints, addScheme(addr))
}
}
sort.Strings(endpoints)
return endpoints
}
var errClosed = errors.New("etcdhttp: client closed connection")
const DefaultTimeout = 500 * time.Millisecond
func Sender(p Peers) func(msgs []raftpb.Message) {
return func(msgs []raftpb.Message) {
for _, m := range msgs {
// TODO: reuse go routines
// limit the number of outgoing connections for the same receiver
go send(p, m)
}
}
}
func send(p Peers, m raftpb.Message) {
// TODO (xiangli): reasonable retry logic
for i := 0; i < 3; i++ {
url := p.Pick(m.To)
if url == "" {
// TODO: unknown peer id.. what do we do? I
// don't think his should ever happen, need to
// look into this further.
log.Println("etcdhttp: no addr for %d", m.To)
return
}
url += "/raft"
// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data, err := m.Marshal()
if err != nil {
log.Println("etcdhttp: dropping message:", err)
return // drop bad message
}
if httpPost(url, data) {
return // success
}
// TODO: backoff
}
}
func httpPost(url string, data []byte) bool {
// TODO: set timeouts
resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data))
if err != nil {
elog.TODO()
return false
}
resp.Body.Close()
if resp.StatusCode != 200 {
elog.TODO()
return false
}
return true
}
// Handler implements the http.Handler interface and serves etcd client and
// raft communication.
type Handler struct {

View File

@ -0,0 +1,139 @@
package etcdhttp
import (
"bytes"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"sort"
"strconv"
"github.com/coreos/etcd/elog"
"github.com/coreos/etcd/raft/raftpb"
)
// Peers contains a mapping of unique IDs to a list of hostnames/IP addresses
type Peers map[int64][]string
// addScheme adds the protocol prefix to a string; currently only HTTP
// TODO: improve this when implementing TLS
func addScheme(addr string) string {
return fmt.Sprintf("http://%s", addr)
}
// Pick chooses a random address from a given Peer's addresses, and returns it as
// an addressible URI. If the given peer does not exist, an empty string is returned.
func (ps Peers) Pick(id int64) string {
addrs := ps[id]
if len(addrs) == 0 {
return ""
}
return addScheme(addrs[rand.Intn(len(addrs))])
}
// Set parses command line sets of names to IPs formatted like:
// a=1.1.1.1&a=1.1.1.2&b=2.2.2.2
func (ps *Peers) Set(s string) error {
m := make(map[int64][]string)
v, err := url.ParseQuery(s)
if err != nil {
return err
}
for k, v := range v {
id, err := strconv.ParseInt(k, 0, 64)
if err != nil {
return err
}
m[id] = v
}
*ps = m
return nil
}
func (ps *Peers) String() string {
v := url.Values{}
for k, vv := range *ps {
for i := range vv {
v.Add(strconv.FormatInt(k, 16), vv[i])
}
}
return v.Encode()
}
func (ps Peers) IDs() []int64 {
var ids []int64
for id := range ps {
ids = append(ids, id)
}
return ids
}
// Endpoints returns a list of all peer addresses. Each address is prefixed
// with the scheme (currently "http://"). The returned list is sorted in
// ascending lexicographical order.
func (ps Peers) Endpoints() []string {
endpoints := make([]string, 0)
for _, addrs := range ps {
for _, addr := range addrs {
endpoints = append(endpoints, addScheme(addr))
}
}
sort.Strings(endpoints)
return endpoints
}
func Sender(p Peers) func(msgs []raftpb.Message) {
return func(msgs []raftpb.Message) {
for _, m := range msgs {
// TODO: reuse go routines
// limit the number of outgoing connections for the same receiver
go send(p, m)
}
}
}
func send(p Peers, m raftpb.Message) {
// TODO (xiangli): reasonable retry logic
for i := 0; i < 3; i++ {
url := p.Pick(m.To)
if url == "" {
// TODO: unknown peer id.. what do we do? I
// don't think his should ever happen, need to
// look into this further.
log.Println("etcdhttp: no addr for %d", m.To)
return
}
url += "/raft"
// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data, err := m.Marshal()
if err != nil {
log.Println("etcdhttp: dropping message:", err)
return // drop bad message
}
if httpPost(url, data) {
return // success
}
// TODO: backoff
}
}
func httpPost(url string, data []byte) bool {
// TODO: set timeouts
resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data))
if err != nil {
elog.TODO()
return false
}
resp.Body.Close()
if resp.StatusCode != 200 {
elog.TODO()
return false
}
return true
}

View File

@ -1,19 +1,162 @@
package etcdhttp
import "testing"
import (
"net/http"
"net/http/httptest"
"reflect"
"sort"
"testing"
)
//TODO: full testing for peer set
func TestPeerSet(t *testing.T) {
p := &Peers{}
tests := []string{
"1=1.1.1.1",
"2=2.2.2.2",
"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
func TestPeers(t *testing.T) {
tests := []struct {
in string
wids []int64
wep []string
wstring string
}{
{
"1=1.1.1.1",
[]int64{1},
[]string{"http://1.1.1.1"},
"1=1.1.1.1",
},
{
"2=2.2.2.2",
[]int64{2},
[]string{"http://2.2.2.2"},
"2=2.2.2.2",
},
{
"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
[]int64{1, 2},
[]string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2"},
"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
},
{
"3=3.3.3.3&4=4.4.4.4&1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
[]int64{1, 2, 3, 4},
[]string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2",
"http://3.3.3.3", "http://4.4.4.4"},
"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2&3=3.3.3.3&4=4.4.4.4",
},
}
for i, tt := range tests {
p.Set(tt)
if p.String() != tt {
t.Errorf("#%d: string = %s, want %s", i, p.String(), tt)
p := &Peers{}
err := p.Set(tt.in)
if err != nil {
t.Errorf("#%d: err=%v, want nil", i, err)
}
ids := p.IDs()
sortint64(ids)
if !reflect.DeepEqual(ids, tt.wids) {
t.Errorf("#%d: IDs=%#v, want %#v", i, ids, tt.wids)
}
ep := p.Endpoints()
if !reflect.DeepEqual(ep, tt.wep) {
t.Errorf("#%d: Endpoints=%#v, want %#v", i, ep, tt.wep)
}
s := p.String()
if s != tt.wstring {
t.Errorf("#%d: string=%q, want %q", i, s, tt.wstring)
}
}
}
func sortint64(list []int64) {
sorted := make(sort.IntSlice, len(list))
for i, j := range list {
sorted[i] = int(j)
}
sorted.Sort()
for i, j := range sorted {
list[i] = int64(j)
}
}
func TestPeersSetBad(t *testing.T) {
tests := []string{
// garbage URL
"asdf%%",
// non-int64 keys
"a=1.2.3.4",
"-1-23=1.2.3.4",
}
for i, tt := range tests {
p := &Peers{}
if err := p.Set(tt); err == nil {
t.Errorf("#%d: err=nil unexpectedly", i)
}
}
}
func TestPeersPick(t *testing.T) {
ps := &Peers{
1: []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"},
2: []string{"xyz"},
3: []string{},
}
ids := map[string]bool{
"http://abc": true,
"http://def": true,
"http://ghi": true,
"http://jkl": true,
"http://mno": true,
"http://pqr": true,
"http://stu": true,
}
for i := 0; i < 1000; i++ {
a := ps.Pick(1)
if _, ok := ids[a]; !ok {
t.Errorf("returned ID %q not in expected range!", a)
break
}
}
if b := ps.Pick(2); b != "http://xyz" {
t.Errorf("id=%q, want %q", b, "http://xyz")
}
if c := ps.Pick(3); c != "" {
t.Errorf("id=%q, want \"\"", c)
}
}
func TestHttpPost(t *testing.T) {
var tr *http.Request
tests := []struct {
h http.HandlerFunc
w bool
}{
{
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tr = r
w.WriteHeader(200)
}),
true,
},
{
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tr = r
w.WriteHeader(404)
}),
false,
},
}
for i, tt := range tests {
ts := httptest.NewServer(tt.h)
if g := httpPost(ts.URL, []byte("adsf")); g != tt.w {
t.Errorf("#%d: httpPost()=%t, want %t", i, g, tt.w)
}
if tr.Method != "POST" {
t.Errorf("#%d: Method=%q, want %q", i, tr.Method, "POST")
}
if ct := tr.Header.Get("Content-Type"); ct != "application/protobuf" {
t.Errorf("%#d: Content-Type=%q, want %q", ct, "application/protobuf")
}
tr = nil
ts.Close()
}
if httpPost("garbage url", []byte("data")) {
t.Errorf("httpPost with bad URL returned true unexpectedly!")
}
}