diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index daabbeedf..3a6a7614d 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -102,6 +102,10 @@ "ImportPath": "github.com/stretchr/testify/assert", "Rev": "9cc77fa25329013ce07362c7742952ff887361f2" }, + { + "ImportPath": "github.com/xiang90/probing", + "Rev": "e8a0407769cb84c61c2ddf8f1d9cdae9fb489b9b" + }, { "ImportPath": "golang.org/x/crypto/bcrypt", "Rev": "1351f936d976c60a0a48d728281922cf63eafb8d" diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/.gitignore b/Godeps/_workspace/src/github.com/xiang90/probing/.gitignore new file mode 100644 index 000000000..daf913b1b --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/LICENSE b/Godeps/_workspace/src/github.com/xiang90/probing/LICENSE new file mode 100644 index 000000000..cde8b8b05 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Xiang Li + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/README.md b/Godeps/_workspace/src/github.com/xiang90/probing/README.md new file mode 100644 index 000000000..2ff682057 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/README.md @@ -0,0 +1,39 @@ +## Getting Started + +### Install the handler + +We first need to serve the probing HTTP handler. + +```go + http.HandleFunc("/health", probing.NewHandler()) + err := http.ListenAndServe(":12345", nil) + if err != nil { + log.Fatal("ListenAndServe: ", err) + } +``` + +### Start to probe + +Now we can start to probe the endpoint. + +``` go + id := "example" + probingInterval = 5 * time.Second + url := "http://example.com:12345/health" + p.AddHTTP(id, probingInterval, url) + + time.Sleep(13 * time.Second) + status, err := p.Status(id) + fmt.Printf("Total Probing: %d, Total Loss: %d, Estimated RTT: %v, Estimated Clock Difference: %v\n", + status.Total(), status.Loss(), status.SRTT(), status.ClockDiff()) + // Total Probing: 2, Total Loss: 0, Estimated RTT: 320.771µs, Estimated Clock Difference: -35.869µs +``` + +### TODOs: + +- TCP probing +- UDP probing +- Gossip based probing +- More accurate RTT estimation +- More accurate Clock difference estimation +- Use a clock interface rather than the real clock diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/prober.go b/Godeps/_workspace/src/github.com/xiang90/probing/prober.go new file mode 100644 index 000000000..e2aa212a2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/prober.go @@ -0,0 +1,112 @@ +package probing + +import ( + "encoding/json" + "errors" + "net/http" + "sync" + "time" +) + +var ( + ErrNotFound = errors.New("probing: id not found") + ErrExist = errors.New("probing: id exists") +) + +type Prober interface { + AddHTTP(id string, probingInterval time.Duration, endpoints []string) error + Remove(id string) error + Reset(id string) error + Status(id string) (Status, error) +} + +type prober struct { + mu sync.Mutex + targets map[string]*status +} + +func NewProber() Prober { + return &prober{targets: make(map[string]*status)} +} + +func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error { + p.mu.Lock() + defer p.mu.Unlock() + if _, ok := p.targets[id]; ok { + return ErrExist + } + + s := &status{stopC: make(chan struct{})} + p.targets[id] = s + + ticker := time.NewTicker(probingInterval) + + go func() { + pinned := 0 + for { + select { + case <-ticker.C: + start := time.Now() + resp, err := http.Get(endpoints[pinned]) + if err != nil { + s.recordFailure() + pinned = (pinned + 1) % len(endpoints) + continue + } + + var hh Health + d := json.NewDecoder(resp.Body) + err = d.Decode(&hh) + resp.Body.Close() + if err != nil || !hh.OK { + s.recordFailure() + pinned = (pinned + 1) % len(endpoints) + continue + } + + s.record(time.Since(start), hh.Now) + case <-s.stopC: + ticker.Stop() + return + } + } + }() + + return nil +} + +func (p *prober) Remove(id string) error { + p.mu.Lock() + defer p.mu.Unlock() + + s, ok := p.targets[id] + if !ok { + return ErrNotFound + } + close(s.stopC) + delete(p.targets, id) + return nil +} + +func (p *prober) Reset(id string) error { + p.mu.Lock() + defer p.mu.Unlock() + + s, ok := p.targets[id] + if !ok { + return ErrNotFound + } + s.reset() + return nil +} + +func (p *prober) Status(id string) (Status, error) { + p.mu.Lock() + defer p.mu.Unlock() + + s, ok := p.targets[id] + if !ok { + return nil, ErrNotFound + } + return s, nil +} diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/prober_test.go b/Godeps/_workspace/src/github.com/xiang90/probing/prober_test.go new file mode 100644 index 000000000..d80bbcaa7 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/prober_test.go @@ -0,0 +1,90 @@ +package probing + +import ( + "net/http/httptest" + "testing" + "time" +) + +var ( + testID = "testID" +) + +func TestProbe(t *testing.T) { + s := httptest.NewServer(NewHandler()) + + p := NewProber() + p.AddHTTP(testID, time.Millisecond, []string{s.URL}) + defer p.Remove(testID) + + time.Sleep(100 * time.Millisecond) + status, err := p.Status(testID) + if err != nil { + t.Fatalf("err = %v, want %v", err, nil) + } + if total := status.Total(); total < 50 || total > 150 { + t.Fatalf("total = %v, want around %v", total, 100) + } + if health := status.Health(); health != true { + t.Fatalf("health = %v, want %v", health, true) + } + + // become unhealthy + s.Close() + + time.Sleep(100 * time.Millisecond) + if total := status.Total(); total < 150 || total > 250 { + t.Fatalf("total = %v, want around %v", total, 200) + } + if loss := status.Loss(); loss < 50 || loss > 150 { + t.Fatalf("loss = %v, want around %v", loss, 200) + } + if health := status.Health(); health != false { + t.Fatalf("health = %v, want %v", health, false) + } +} + +func TestProbeReset(t *testing.T) { + s := httptest.NewServer(NewHandler()) + defer s.Close() + + p := NewProber() + p.AddHTTP(testID, time.Millisecond, []string{s.URL}) + defer p.Remove(testID) + + time.Sleep(100 * time.Millisecond) + status, err := p.Status(testID) + if err != nil { + t.Fatalf("err = %v, want %v", err, nil) + } + if total := status.Total(); total < 50 || total > 150 { + t.Fatalf("total = %v, want around %v", total, 100) + } + if health := status.Health(); health != true { + t.Fatalf("health = %v, want %v", health, true) + } + + p.Reset(testID) + + time.Sleep(100 * time.Millisecond) + if total := status.Total(); total < 50 || total > 150 { + t.Fatalf("total = %v, want around %v", total, 100) + } + if health := status.Health(); health != true { + t.Fatalf("health = %v, want %v", health, true) + } +} + +func TestProbeRemove(t *testing.T) { + s := httptest.NewServer(NewHandler()) + defer s.Close() + + p := NewProber() + p.AddHTTP(testID, time.Millisecond, []string{s.URL}) + + p.Remove(testID) + _, err := p.Status(testID) + if err != ErrNotFound { + t.Fatalf("err = %v, want %v", err, ErrNotFound) + } +} diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/server.go b/Godeps/_workspace/src/github.com/xiang90/probing/server.go new file mode 100644 index 000000000..0e7b797d2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/server.go @@ -0,0 +1,25 @@ +package probing + +import ( + "encoding/json" + "net/http" + "time" +) + +func NewHandler() http.Handler { + return &httpHealth{} +} + +type httpHealth struct { +} + +type Health struct { + OK bool + Now time.Time +} + +func (h *httpHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) { + health := Health{OK: true, Now: time.Now()} + e := json.NewEncoder(w) + e.Encode(health) +} diff --git a/Godeps/_workspace/src/github.com/xiang90/probing/status.go b/Godeps/_workspace/src/github.com/xiang90/probing/status.go new file mode 100644 index 000000000..bdfab2761 --- /dev/null +++ b/Godeps/_workspace/src/github.com/xiang90/probing/status.go @@ -0,0 +1,96 @@ +package probing + +import ( + "sync" + "time" +) + +var ( + // weight factor + α = 0.125 +) + +type Status interface { + Total() int64 + Loss() int64 + Health() bool + // Estimated smoothed round trip time + SRTT() time.Duration + // Estimated clock difference + ClockDiff() time.Duration + StopNotify() <-chan struct{} +} + +type status struct { + mu sync.Mutex + srtt time.Duration + total int64 + loss int64 + health bool + clockdiff time.Duration + stopC chan struct{} +} + +// SRTT = (1-α) * SRTT + α * RTT +func (s *status) SRTT() time.Duration { + s.mu.Lock() + defer s.mu.Unlock() + return s.srtt +} + +func (s *status) Total() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.total +} + +func (s *status) Loss() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.loss +} + +func (s *status) Health() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.health +} + +func (s *status) ClockDiff() time.Duration { + s.mu.Lock() + defer s.mu.Unlock() + return s.clockdiff +} + +func (s *status) StopNotify() <-chan struct{} { + return s.stopC +} + +func (s *status) record(rtt time.Duration, when time.Time) { + s.mu.Lock() + defer s.mu.Unlock() + + s.total += 1 + s.health = true + s.srtt = time.Duration((1-α)*float64(s.srtt) + α*float64(rtt)) + s.clockdiff = time.Now().Sub(when) - s.srtt/2 +} + +func (s *status) recordFailure() { + s.mu.Lock() + defer s.mu.Unlock() + + s.total++ + s.health = false + s.loss += 1 +} + +func (s *status) reset() { + s.mu.Lock() + defer s.mu.Unlock() + + s.srtt = 0 + s.total = 0 + s.health = false + s.clockdiff = 0 +}