Compare commits
10 Commits
97dd45122b
...
7e4fc7eaa9
Author | SHA1 | Date |
---|---|---|
Gyu-Ho Lee | 7e4fc7eaa9 | |
Gyu-Ho Lee | 61eee5c884 | |
Gyu-Ho Lee | ae24914aec | |
Gyu-Ho Lee | 653789bcbc | |
Gyu-Ho Lee | 62c1e9a824 | |
Gyu-Ho Lee | 67228bf5d8 | |
Gyu-Ho Lee | 316adb4bcc | |
Anthony Romano | 756992d30f | |
Xiang Li | aaf0ac9ff4 | |
Xiang Li | d1ba8ee6d3 |
21
.travis.yml
21
.travis.yml
|
@ -2,24 +2,11 @@ language: go
|
|||
sudo: false
|
||||
|
||||
go:
|
||||
- 1.4
|
||||
- 1.5
|
||||
- 1.6
|
||||
- tip
|
||||
- 1.7.5
|
||||
|
||||
matrix:
|
||||
allow_failures:
|
||||
- go: tip
|
||||
|
||||
addons:
|
||||
apt:
|
||||
packages:
|
||||
- libpcap-dev
|
||||
- libaspell-dev
|
||||
- libhunspell-dev
|
||||
|
||||
before_install:
|
||||
- go get -v github.com/chzchzchz/goword
|
||||
notifications:
|
||||
on_success: never
|
||||
on_failure: never
|
||||
|
||||
script:
|
||||
- ./test
|
||||
|
|
|
@ -63,11 +63,12 @@ func (s *mockAuthStore) GetUser(name string) (auth.User, error) {
|
|||
return *u, s.err
|
||||
}
|
||||
func (s *mockAuthStore) CreateOrUpdateUser(user auth.User) (out auth.User, created bool, err error) {
|
||||
var u auth.User
|
||||
if s.users == nil {
|
||||
u, err := s.CreateUser(user)
|
||||
u, err = s.CreateUser(user)
|
||||
return u, true, err
|
||||
}
|
||||
u, err := s.UpdateUser(user)
|
||||
u, err = s.UpdateUser(user)
|
||||
return u, false, err
|
||||
}
|
||||
func (s *mockAuthStore) CreateUser(user auth.User) (auth.User, error) { return user, s.err }
|
||||
|
|
|
@ -158,7 +158,12 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
|
|||
raftStorage: raft.NewMemoryStorage(),
|
||||
transport: rafthttp.NewNopTransporter(),
|
||||
}
|
||||
r.start(&EtcdServer{r: r})
|
||||
r.start(&EtcdServer{r: raftNode{
|
||||
Node: r.Node,
|
||||
storage: r.storage,
|
||||
raftStorage: r.raftStorage,
|
||||
transport: r.transport,
|
||||
}})
|
||||
n.readyc <- raft.Ready{}
|
||||
select {
|
||||
case <-r.applyc:
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
|
@ -78,9 +79,14 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
|||
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||
}
|
||||
|
||||
err = ioutil.WriteFile(path.Join(s.dir, fname), d, 0666)
|
||||
err = pioutil.WriteAndSyncFile(path.Join(s.dir, fname), d, 0666)
|
||||
if err == nil {
|
||||
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||
} else {
|
||||
err1 := os.Remove(path.Join(s.dir, fname))
|
||||
if err1 != nil {
|
||||
plog.Errorf("failed to remove broken snapshot file %s", path.Join(s.dir, fname))
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
40
test
40
test
|
@ -73,33 +73,21 @@ function fmt_tests {
|
|||
exit 255
|
||||
fi
|
||||
|
||||
echo "Checking govet..."
|
||||
vetRes=$(go vet $TEST)
|
||||
if [ -n "${vetRes}" ]; then
|
||||
echo -e "govet checking failed:\n${vetRes}"
|
||||
exit 255
|
||||
fi
|
||||
# echo "Checking govet..."
|
||||
# vetRes=$(go vet $TEST)
|
||||
# if [ -n "${vetRes}" ]; then
|
||||
# echo -e "govet checking failed:\n${vetRes}"
|
||||
# exit 255
|
||||
# fi
|
||||
|
||||
echo "Checking govet -shadow..."
|
||||
for path in $FMT; do
|
||||
vetRes=$(go tool vet -shadow ${path})
|
||||
if [ -n "${vetRes}" ]; then
|
||||
echo -e "govet checking ${path} failed:\n${vetRes}"
|
||||
exit 255
|
||||
fi
|
||||
done
|
||||
|
||||
echo "Checking goword..."
|
||||
# get all go files to process
|
||||
gofiles=`find $FMT -iname '*.go' 2>/dev/null`
|
||||
# ignore tests and protobuf files
|
||||
gofiles=`echo ${gofiles} | sort | uniq | sed "s/ /\n/g" | egrep -v "(\\_test.go|\\.pb\\.go)"`
|
||||
# only check for broken exported godocs
|
||||
gowordRes=`goword -use-spell=false ${gofiles} | grep godoc-export | sort`
|
||||
if [ ! -z "$gowordRes" ]; then
|
||||
echo -e "goword checking failed:\n${gowordRes}"
|
||||
exit 255
|
||||
fi
|
||||
# echo "Checking govet -shadow..."
|
||||
# for path in $FMT; do
|
||||
# vetRes=$(go tool vet -shadow ${path})
|
||||
# if [ -n "${vetRes}" ]; then
|
||||
# echo -e "govet checking ${path} failed:\n${vetRes}"
|
||||
# exit 255
|
||||
# fi
|
||||
# done
|
||||
|
||||
echo "Checking for license header..."
|
||||
licRes=$(for file in $(find . -type f -iname '*.go' ! -path './Godeps/*'); do
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
# etcd-top
|
||||
etcd realtime workload analyzer. Useful for rapid diagnosis of production usage issues and analysis of production request distributions.
|
||||
|
||||
usage:
|
||||
```
|
||||
-iface="eth0": interface for sniffing traffic on
|
||||
-period=1: seconds between submissions
|
||||
-ports="2379,4001": etcd listening ports
|
||||
-promiscuous=true: whether to perform promiscuous sniffing or not.
|
||||
-topk=10: submit stats for the top <K> sniffed paths
|
||||
```
|
||||
|
||||
result:
|
||||
```
|
||||
go run etcd-top.go --period=1 -topk=3
|
||||
1440035702 sniffed 1074 requests over last 1 seconds
|
||||
|
||||
Top 3 most popular http requests:
|
||||
Sum Rate Verb Path
|
||||
1305 22 GET /v2/keys/c
|
||||
1302 8 GET /v2/keys/S
|
||||
1297 10 GET /v2/keys/h
|
||||
```
|
|
@ -1,229 +0,0 @@
|
|||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spacejam/loghisto"
|
||||
)
|
||||
|
||||
type nameSum struct {
|
||||
Name string
|
||||
Sum float64
|
||||
Rate float64
|
||||
}
|
||||
|
||||
type nameSums []nameSum
|
||||
|
||||
func (n nameSums) Len() int {
|
||||
return len(n)
|
||||
}
|
||||
func (n nameSums) Less(i, j int) bool {
|
||||
return n[i].Sum > n[j].Sum
|
||||
}
|
||||
func (n nameSums) Swap(i, j int) {
|
||||
n[i], n[j] = n[j], n[i]
|
||||
}
|
||||
|
||||
// This function listens for periodic metrics from the loghisto metric system,
|
||||
// and upon receipt of a batch of them it will print out the desired topK.
|
||||
func statPrinter(metricStream chan *loghisto.ProcessedMetricSet, topK, period uint) {
|
||||
for m := range metricStream {
|
||||
requestCounter := float64(0)
|
||||
nvs := nameSums{}
|
||||
for k, v := range m.Metrics {
|
||||
// loghisto adds _rate suffixed metrics for counters and histograms
|
||||
if strings.HasSuffix(k, "_rate") && !strings.HasSuffix(k, "_rate_rate") {
|
||||
continue
|
||||
}
|
||||
nvs = append(nvs, nameSum{
|
||||
Name: k,
|
||||
Sum: v,
|
||||
Rate: m.Metrics[k+"_rate"],
|
||||
})
|
||||
requestCounter += m.Metrics[k+"_rate"]
|
||||
}
|
||||
|
||||
fmt.Printf("\n%d sniffed %d requests over last %d seconds\n\n", time.Now().Unix(),
|
||||
uint(requestCounter), period)
|
||||
if len(nvs) == 0 {
|
||||
continue
|
||||
}
|
||||
sort.Sort(nvs)
|
||||
fmt.Printf("Top %d most popular http requests:\n", topK)
|
||||
fmt.Println("Total Sum Period Sum Verb Path")
|
||||
for _, nv := range nvs[0:int(math.Min(float64(len(nvs)), float64(topK)))] {
|
||||
fmt.Printf("%9.1d %7.1d %s\n", int(nv.Sum), int(nv.Rate), nv.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// packetDecoder decodes packets and hands them off to the streamRouter
|
||||
func packetDecoder(packetsIn chan *pcap.Packet, packetsOut chan *pcap.Packet) {
|
||||
for pkt := range packetsIn {
|
||||
pkt.Decode()
|
||||
select {
|
||||
case packetsOut <- pkt:
|
||||
default:
|
||||
fmt.Fprint(os.Stderr, "shedding at decoder!")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processor tries to parse an http request from each packet, and if
|
||||
// successful it records metrics about it in the loghisto metric system.
|
||||
func processor(ms *loghisto.MetricSystem, packetsIn chan *pcap.Packet) {
|
||||
for pkt := range packetsIn {
|
||||
req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(pkt.Payload)))
|
||||
if reqErr == nil {
|
||||
ms.Counter(req.Method+" "+req.URL.Path, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// streamRouter takes a decoded packet and routes it to a processor that can deal with all requests
|
||||
// and responses for this particular TCP connection. This allows the processor to own a local map
|
||||
// of requests so that it can avoid coordinating with other goroutines to perform analysis.
|
||||
func streamRouter(ports []uint16, parsedPackets chan *pcap.Packet, processors []chan *pcap.Packet) {
|
||||
for pkt := range parsedPackets {
|
||||
if pkt.TCP == nil {
|
||||
continue
|
||||
}
|
||||
clientPort := uint16(0)
|
||||
for _, p := range ports {
|
||||
if pkt.TCP.SrcPort == p {
|
||||
clientPort = pkt.TCP.DestPort
|
||||
break
|
||||
}
|
||||
if pkt.TCP.DestPort == p {
|
||||
clientPort = pkt.TCP.SrcPort
|
||||
break
|
||||
}
|
||||
}
|
||||
if clientPort != 0 {
|
||||
// client Port can be assumed to have sufficient entropy for
|
||||
// distribution among processors, and we want the same
|
||||
// tcp stream to go to the same processor every time
|
||||
// so that if we do proper packet reconstruction it will
|
||||
// be easier.
|
||||
select {
|
||||
case processors[int(clientPort)%len(processors)] <- pkt:
|
||||
default:
|
||||
fmt.Fprint(os.Stderr, "Shedding load at router!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 1. parse args
|
||||
// 2. start the loghisto metric system
|
||||
// 3. start the processing and printing goroutines
|
||||
// 4. open the pcap handler
|
||||
// 5. hand off packets from the handler to the decoder
|
||||
func main() {
|
||||
portsArg := flag.String("ports", "2379,4001", "etcd listening ports")
|
||||
iface := flag.String("iface", "eth0", "interface for sniffing traffic on")
|
||||
promisc := flag.Bool("promiscuous", true, "promiscuous mode")
|
||||
period := flag.Uint("period", 1, "seconds between submissions")
|
||||
topK := flag.Uint("topk", 10, "submit stats for the top <K> sniffed paths")
|
||||
flag.Parse()
|
||||
|
||||
numCPU := runtime.NumCPU()
|
||||
runtime.GOMAXPROCS(numCPU)
|
||||
|
||||
ms := loghisto.NewMetricSystem(time.Duration(*period)*time.Second, false)
|
||||
ms.Start()
|
||||
metricStream := make(chan *loghisto.ProcessedMetricSet, 2)
|
||||
ms.SubscribeToProcessedMetrics(metricStream)
|
||||
defer ms.UnsubscribeFromProcessedMetrics(metricStream)
|
||||
|
||||
go statPrinter(metricStream, *topK, *period)
|
||||
|
||||
ports := []uint16{}
|
||||
for _, p := range strings.Split(*portsArg, ",") {
|
||||
port, err := strconv.Atoi(p)
|
||||
if err == nil {
|
||||
ports = append(ports, uint16(port))
|
||||
} else {
|
||||
fmt.Fprintf(os.Stderr, "Failed to parse port \"%s\": %v\n", p, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
if len(ports) == 0 {
|
||||
fmt.Fprint(os.Stderr, "No ports given! Exiting.\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// We choose 1518 for the snaplen because it's the default
|
||||
// ethernet MTU at the link layer. We choose 1000 for the
|
||||
// timeout based on a measurement for its impact on latency
|
||||
// impact, but it is less precise.
|
||||
h, err := pcap.Openlive(*iface, 1518, *promisc, 1000)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer h.Close()
|
||||
|
||||
portArray := strings.Split(*portsArg, ",")
|
||||
dst := strings.Join(portArray, " or dst port ")
|
||||
src := strings.Join(portArray, " or src port ")
|
||||
filter := fmt.Sprintf("tcp and (dst port %s or src port %s)", dst, src)
|
||||
fmt.Println("using bpf filter: ", filter)
|
||||
if err := h.Setfilter(filter); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
unparsedPackets := make(chan *pcap.Packet, 16384)
|
||||
parsedPackets := make(chan *pcap.Packet, 16384)
|
||||
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
|
||||
go packetDecoder(unparsedPackets, parsedPackets)
|
||||
}
|
||||
|
||||
processors := []chan *pcap.Packet{}
|
||||
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
|
||||
p := make(chan *pcap.Packet, 16384)
|
||||
processors = append(processors, p)
|
||||
go processor(ms, p)
|
||||
}
|
||||
|
||||
go streamRouter(ports, parsedPackets, processors)
|
||||
|
||||
for {
|
||||
pkt := h.Next()
|
||||
if pkt != nil {
|
||||
select {
|
||||
case unparsedPackets <- pkt:
|
||||
default:
|
||||
fmt.Fprint(os.Stderr, "SHEDDING IN MAIN")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,7 +29,7 @@ import (
|
|||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "2.2.0"
|
||||
Version = "2.3.7+git"
|
||||
Version = "2.3.8"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
|
|
Loading…
Reference in New Issue