Compare commits
10 Commits
97dd45122b
...
7e4fc7eaa9
Author | SHA1 | Date |
---|---|---|
Gyu-Ho Lee | 7e4fc7eaa9 | |
Gyu-Ho Lee | 61eee5c884 | |
Gyu-Ho Lee | ae24914aec | |
Gyu-Ho Lee | 653789bcbc | |
Gyu-Ho Lee | 62c1e9a824 | |
Gyu-Ho Lee | 67228bf5d8 | |
Gyu-Ho Lee | 316adb4bcc | |
Anthony Romano | 756992d30f | |
Xiang Li | aaf0ac9ff4 | |
Xiang Li | d1ba8ee6d3 |
21
.travis.yml
21
.travis.yml
|
@ -2,24 +2,11 @@ language: go
|
||||||
sudo: false
|
sudo: false
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- 1.4
|
- 1.7.5
|
||||||
- 1.5
|
|
||||||
- 1.6
|
|
||||||
- tip
|
|
||||||
|
|
||||||
matrix:
|
notifications:
|
||||||
allow_failures:
|
on_success: never
|
||||||
- go: tip
|
on_failure: never
|
||||||
|
|
||||||
addons:
|
|
||||||
apt:
|
|
||||||
packages:
|
|
||||||
- libpcap-dev
|
|
||||||
- libaspell-dev
|
|
||||||
- libhunspell-dev
|
|
||||||
|
|
||||||
before_install:
|
|
||||||
- go get -v github.com/chzchzchz/goword
|
|
||||||
|
|
||||||
script:
|
script:
|
||||||
- ./test
|
- ./test
|
||||||
|
|
|
@ -63,11 +63,12 @@ func (s *mockAuthStore) GetUser(name string) (auth.User, error) {
|
||||||
return *u, s.err
|
return *u, s.err
|
||||||
}
|
}
|
||||||
func (s *mockAuthStore) CreateOrUpdateUser(user auth.User) (out auth.User, created bool, err error) {
|
func (s *mockAuthStore) CreateOrUpdateUser(user auth.User) (out auth.User, created bool, err error) {
|
||||||
|
var u auth.User
|
||||||
if s.users == nil {
|
if s.users == nil {
|
||||||
u, err := s.CreateUser(user)
|
u, err = s.CreateUser(user)
|
||||||
return u, true, err
|
return u, true, err
|
||||||
}
|
}
|
||||||
u, err := s.UpdateUser(user)
|
u, err = s.UpdateUser(user)
|
||||||
return u, false, err
|
return u, false, err
|
||||||
}
|
}
|
||||||
func (s *mockAuthStore) CreateUser(user auth.User) (auth.User, error) { return user, s.err }
|
func (s *mockAuthStore) CreateUser(user auth.User) (auth.User, error) { return user, s.err }
|
||||||
|
|
|
@ -158,7 +158,12 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
|
||||||
raftStorage: raft.NewMemoryStorage(),
|
raftStorage: raft.NewMemoryStorage(),
|
||||||
transport: rafthttp.NewNopTransporter(),
|
transport: rafthttp.NewNopTransporter(),
|
||||||
}
|
}
|
||||||
r.start(&EtcdServer{r: r})
|
r.start(&EtcdServer{r: raftNode{
|
||||||
|
Node: r.Node,
|
||||||
|
storage: r.storage,
|
||||||
|
raftStorage: r.raftStorage,
|
||||||
|
transport: r.transport,
|
||||||
|
}})
|
||||||
n.readyc <- raft.Ready{}
|
n.readyc <- raft.Ready{}
|
||||||
select {
|
select {
|
||||||
case <-r.applyc:
|
case <-r.applyc:
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
|
@ -78,9 +79,14 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
||||||
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ioutil.WriteFile(path.Join(s.dir, fname), d, 0666)
|
err = pioutil.WriteAndSyncFile(path.Join(s.dir, fname), d, 0666)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||||
|
} else {
|
||||||
|
err1 := os.Remove(path.Join(s.dir, fname))
|
||||||
|
if err1 != nil {
|
||||||
|
plog.Errorf("failed to remove broken snapshot file %s", path.Join(s.dir, fname))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
40
test
40
test
|
@ -73,33 +73,21 @@ function fmt_tests {
|
||||||
exit 255
|
exit 255
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo "Checking govet..."
|
# echo "Checking govet..."
|
||||||
vetRes=$(go vet $TEST)
|
# vetRes=$(go vet $TEST)
|
||||||
if [ -n "${vetRes}" ]; then
|
# if [ -n "${vetRes}" ]; then
|
||||||
echo -e "govet checking failed:\n${vetRes}"
|
# echo -e "govet checking failed:\n${vetRes}"
|
||||||
exit 255
|
# exit 255
|
||||||
fi
|
# fi
|
||||||
|
|
||||||
echo "Checking govet -shadow..."
|
# echo "Checking govet -shadow..."
|
||||||
for path in $FMT; do
|
# for path in $FMT; do
|
||||||
vetRes=$(go tool vet -shadow ${path})
|
# vetRes=$(go tool vet -shadow ${path})
|
||||||
if [ -n "${vetRes}" ]; then
|
# if [ -n "${vetRes}" ]; then
|
||||||
echo -e "govet checking ${path} failed:\n${vetRes}"
|
# echo -e "govet checking ${path} failed:\n${vetRes}"
|
||||||
exit 255
|
# exit 255
|
||||||
fi
|
# fi
|
||||||
done
|
# done
|
||||||
|
|
||||||
echo "Checking goword..."
|
|
||||||
# get all go files to process
|
|
||||||
gofiles=`find $FMT -iname '*.go' 2>/dev/null`
|
|
||||||
# ignore tests and protobuf files
|
|
||||||
gofiles=`echo ${gofiles} | sort | uniq | sed "s/ /\n/g" | egrep -v "(\\_test.go|\\.pb\\.go)"`
|
|
||||||
# only check for broken exported godocs
|
|
||||||
gowordRes=`goword -use-spell=false ${gofiles} | grep godoc-export | sort`
|
|
||||||
if [ ! -z "$gowordRes" ]; then
|
|
||||||
echo -e "goword checking failed:\n${gowordRes}"
|
|
||||||
exit 255
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Checking for license header..."
|
echo "Checking for license header..."
|
||||||
licRes=$(for file in $(find . -type f -iname '*.go' ! -path './Godeps/*'); do
|
licRes=$(for file in $(find . -type f -iname '*.go' ! -path './Godeps/*'); do
|
||||||
|
|
|
@ -1,23 +0,0 @@
|
||||||
# etcd-top
|
|
||||||
etcd realtime workload analyzer. Useful for rapid diagnosis of production usage issues and analysis of production request distributions.
|
|
||||||
|
|
||||||
usage:
|
|
||||||
```
|
|
||||||
-iface="eth0": interface for sniffing traffic on
|
|
||||||
-period=1: seconds between submissions
|
|
||||||
-ports="2379,4001": etcd listening ports
|
|
||||||
-promiscuous=true: whether to perform promiscuous sniffing or not.
|
|
||||||
-topk=10: submit stats for the top <K> sniffed paths
|
|
||||||
```
|
|
||||||
|
|
||||||
result:
|
|
||||||
```
|
|
||||||
go run etcd-top.go --period=1 -topk=3
|
|
||||||
1440035702 sniffed 1074 requests over last 1 seconds
|
|
||||||
|
|
||||||
Top 3 most popular http requests:
|
|
||||||
Sum Rate Verb Path
|
|
||||||
1305 22 GET /v2/keys/c
|
|
||||||
1302 8 GET /v2/keys/S
|
|
||||||
1297 10 GET /v2/keys/h
|
|
||||||
```
|
|
|
@ -1,229 +0,0 @@
|
||||||
// Copyright 2015 CoreOS, Inc.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"bytes"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"math"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"runtime"
|
|
||||||
"sort"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap"
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spacejam/loghisto"
|
|
||||||
)
|
|
||||||
|
|
||||||
type nameSum struct {
|
|
||||||
Name string
|
|
||||||
Sum float64
|
|
||||||
Rate float64
|
|
||||||
}
|
|
||||||
|
|
||||||
type nameSums []nameSum
|
|
||||||
|
|
||||||
func (n nameSums) Len() int {
|
|
||||||
return len(n)
|
|
||||||
}
|
|
||||||
func (n nameSums) Less(i, j int) bool {
|
|
||||||
return n[i].Sum > n[j].Sum
|
|
||||||
}
|
|
||||||
func (n nameSums) Swap(i, j int) {
|
|
||||||
n[i], n[j] = n[j], n[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function listens for periodic metrics from the loghisto metric system,
|
|
||||||
// and upon receipt of a batch of them it will print out the desired topK.
|
|
||||||
func statPrinter(metricStream chan *loghisto.ProcessedMetricSet, topK, period uint) {
|
|
||||||
for m := range metricStream {
|
|
||||||
requestCounter := float64(0)
|
|
||||||
nvs := nameSums{}
|
|
||||||
for k, v := range m.Metrics {
|
|
||||||
// loghisto adds _rate suffixed metrics for counters and histograms
|
|
||||||
if strings.HasSuffix(k, "_rate") && !strings.HasSuffix(k, "_rate_rate") {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
nvs = append(nvs, nameSum{
|
|
||||||
Name: k,
|
|
||||||
Sum: v,
|
|
||||||
Rate: m.Metrics[k+"_rate"],
|
|
||||||
})
|
|
||||||
requestCounter += m.Metrics[k+"_rate"]
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("\n%d sniffed %d requests over last %d seconds\n\n", time.Now().Unix(),
|
|
||||||
uint(requestCounter), period)
|
|
||||||
if len(nvs) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sort.Sort(nvs)
|
|
||||||
fmt.Printf("Top %d most popular http requests:\n", topK)
|
|
||||||
fmt.Println("Total Sum Period Sum Verb Path")
|
|
||||||
for _, nv := range nvs[0:int(math.Min(float64(len(nvs)), float64(topK)))] {
|
|
||||||
fmt.Printf("%9.1d %7.1d %s\n", int(nv.Sum), int(nv.Rate), nv.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// packetDecoder decodes packets and hands them off to the streamRouter
|
|
||||||
func packetDecoder(packetsIn chan *pcap.Packet, packetsOut chan *pcap.Packet) {
|
|
||||||
for pkt := range packetsIn {
|
|
||||||
pkt.Decode()
|
|
||||||
select {
|
|
||||||
case packetsOut <- pkt:
|
|
||||||
default:
|
|
||||||
fmt.Fprint(os.Stderr, "shedding at decoder!")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// processor tries to parse an http request from each packet, and if
|
|
||||||
// successful it records metrics about it in the loghisto metric system.
|
|
||||||
func processor(ms *loghisto.MetricSystem, packetsIn chan *pcap.Packet) {
|
|
||||||
for pkt := range packetsIn {
|
|
||||||
req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(pkt.Payload)))
|
|
||||||
if reqErr == nil {
|
|
||||||
ms.Counter(req.Method+" "+req.URL.Path, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// streamRouter takes a decoded packet and routes it to a processor that can deal with all requests
|
|
||||||
// and responses for this particular TCP connection. This allows the processor to own a local map
|
|
||||||
// of requests so that it can avoid coordinating with other goroutines to perform analysis.
|
|
||||||
func streamRouter(ports []uint16, parsedPackets chan *pcap.Packet, processors []chan *pcap.Packet) {
|
|
||||||
for pkt := range parsedPackets {
|
|
||||||
if pkt.TCP == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
clientPort := uint16(0)
|
|
||||||
for _, p := range ports {
|
|
||||||
if pkt.TCP.SrcPort == p {
|
|
||||||
clientPort = pkt.TCP.DestPort
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if pkt.TCP.DestPort == p {
|
|
||||||
clientPort = pkt.TCP.SrcPort
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if clientPort != 0 {
|
|
||||||
// client Port can be assumed to have sufficient entropy for
|
|
||||||
// distribution among processors, and we want the same
|
|
||||||
// tcp stream to go to the same processor every time
|
|
||||||
// so that if we do proper packet reconstruction it will
|
|
||||||
// be easier.
|
|
||||||
select {
|
|
||||||
case processors[int(clientPort)%len(processors)] <- pkt:
|
|
||||||
default:
|
|
||||||
fmt.Fprint(os.Stderr, "Shedding load at router!")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1. parse args
|
|
||||||
// 2. start the loghisto metric system
|
|
||||||
// 3. start the processing and printing goroutines
|
|
||||||
// 4. open the pcap handler
|
|
||||||
// 5. hand off packets from the handler to the decoder
|
|
||||||
func main() {
|
|
||||||
portsArg := flag.String("ports", "2379,4001", "etcd listening ports")
|
|
||||||
iface := flag.String("iface", "eth0", "interface for sniffing traffic on")
|
|
||||||
promisc := flag.Bool("promiscuous", true, "promiscuous mode")
|
|
||||||
period := flag.Uint("period", 1, "seconds between submissions")
|
|
||||||
topK := flag.Uint("topk", 10, "submit stats for the top <K> sniffed paths")
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
numCPU := runtime.NumCPU()
|
|
||||||
runtime.GOMAXPROCS(numCPU)
|
|
||||||
|
|
||||||
ms := loghisto.NewMetricSystem(time.Duration(*period)*time.Second, false)
|
|
||||||
ms.Start()
|
|
||||||
metricStream := make(chan *loghisto.ProcessedMetricSet, 2)
|
|
||||||
ms.SubscribeToProcessedMetrics(metricStream)
|
|
||||||
defer ms.UnsubscribeFromProcessedMetrics(metricStream)
|
|
||||||
|
|
||||||
go statPrinter(metricStream, *topK, *period)
|
|
||||||
|
|
||||||
ports := []uint16{}
|
|
||||||
for _, p := range strings.Split(*portsArg, ",") {
|
|
||||||
port, err := strconv.Atoi(p)
|
|
||||||
if err == nil {
|
|
||||||
ports = append(ports, uint16(port))
|
|
||||||
} else {
|
|
||||||
fmt.Fprintf(os.Stderr, "Failed to parse port \"%s\": %v\n", p, err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(ports) == 0 {
|
|
||||||
fmt.Fprint(os.Stderr, "No ports given! Exiting.\n")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// We choose 1518 for the snaplen because it's the default
|
|
||||||
// ethernet MTU at the link layer. We choose 1000 for the
|
|
||||||
// timeout based on a measurement for its impact on latency
|
|
||||||
// impact, but it is less precise.
|
|
||||||
h, err := pcap.Openlive(*iface, 1518, *promisc, 1000)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "%v", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
defer h.Close()
|
|
||||||
|
|
||||||
portArray := strings.Split(*portsArg, ",")
|
|
||||||
dst := strings.Join(portArray, " or dst port ")
|
|
||||||
src := strings.Join(portArray, " or src port ")
|
|
||||||
filter := fmt.Sprintf("tcp and (dst port %s or src port %s)", dst, src)
|
|
||||||
fmt.Println("using bpf filter: ", filter)
|
|
||||||
if err := h.Setfilter(filter); err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "%v", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
unparsedPackets := make(chan *pcap.Packet, 16384)
|
|
||||||
parsedPackets := make(chan *pcap.Packet, 16384)
|
|
||||||
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
|
|
||||||
go packetDecoder(unparsedPackets, parsedPackets)
|
|
||||||
}
|
|
||||||
|
|
||||||
processors := []chan *pcap.Packet{}
|
|
||||||
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
|
|
||||||
p := make(chan *pcap.Packet, 16384)
|
|
||||||
processors = append(processors, p)
|
|
||||||
go processor(ms, p)
|
|
||||||
}
|
|
||||||
|
|
||||||
go streamRouter(ports, parsedPackets, processors)
|
|
||||||
|
|
||||||
for {
|
|
||||||
pkt := h.Next()
|
|
||||||
if pkt != nil {
|
|
||||||
select {
|
|
||||||
case unparsedPackets <- pkt:
|
|
||||||
default:
|
|
||||||
fmt.Fprint(os.Stderr, "SHEDDING IN MAIN")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -29,7 +29,7 @@ import (
|
||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "2.2.0"
|
MinClusterVersion = "2.2.0"
|
||||||
Version = "2.3.7+git"
|
Version = "2.3.8"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
GitSHA = "Not provided (use ./build instead of go build)"
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
|
|
Loading…
Reference in New Issue