Compare commits

...

10 Commits

Author SHA1 Message Date
Gyu-Ho Lee 7e4fc7eaa9
version: bump up to 2.3.8 2017-02-15 19:25:01 -08:00
Gyu-Ho Lee 61eee5c884
test: skip govet tests in CI 2017-02-15 19:24:37 -08:00
Gyu-Ho Lee ae24914aec
etcdhttp: fix govet 2017-02-15 19:17:38 -08:00
Gyu-Ho Lee 653789bcbc
etcdserver: fix govet 2017-02-15 19:11:23 -08:00
Gyu-Ho Lee 62c1e9a824
travis: use Go 1.7.5 2017-02-15 17:02:50 -08:00
Gyu-Ho Lee 67228bf5d8
tools: remove 'etcd-top' for CI tests 2017-02-15 10:24:58 -08:00
Gyu-Ho Lee 316adb4bcc
test: do not run 'goword' tests in CI 2017-02-14 23:22:44 -08:00
Anthony Romano 756992d30f
travis: disable email notifications
Was spamming security@coreos.com
2017-02-14 23:19:35 -08:00
Xiang Li aaf0ac9ff4 Merge pull request #7282 from heyitsanthony/fix-write-snap-2.3
snap: fix write snap
2017-02-08 20:16:25 -08:00
Xiang Li d1ba8ee6d3 snap: fix write snap
Do not use writeFile since it does not sync file before closing.
This can lead to slient file corruption when disk is full.
2017-02-06 09:50:14 -08:00
8 changed files with 35 additions and 300 deletions

View File

@ -2,24 +2,11 @@ language: go
sudo: false
go:
- 1.4
- 1.5
- 1.6
- tip
- 1.7.5
matrix:
allow_failures:
- go: tip
addons:
apt:
packages:
- libpcap-dev
- libaspell-dev
- libhunspell-dev
before_install:
- go get -v github.com/chzchzchz/goword
notifications:
on_success: never
on_failure: never
script:
- ./test

View File

@ -63,11 +63,12 @@ func (s *mockAuthStore) GetUser(name string) (auth.User, error) {
return *u, s.err
}
func (s *mockAuthStore) CreateOrUpdateUser(user auth.User) (out auth.User, created bool, err error) {
var u auth.User
if s.users == nil {
u, err := s.CreateUser(user)
u, err = s.CreateUser(user)
return u, true, err
}
u, err := s.UpdateUser(user)
u, err = s.UpdateUser(user)
return u, false, err
}
func (s *mockAuthStore) CreateUser(user auth.User) (auth.User, error) { return user, s.err }

View File

@ -158,7 +158,12 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
}
r.start(&EtcdServer{r: r})
r.start(&EtcdServer{r: raftNode{
Node: r.Node,
storage: r.storage,
raftStorage: r.raftStorage,
transport: r.transport,
}})
n.readyc <- raft.Ready{}
select {
case <-r.applyc:

View File

@ -26,6 +26,7 @@ import (
"strings"
"time"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
@ -78,9 +79,14 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
}
err = ioutil.WriteFile(path.Join(s.dir, fname), d, 0666)
err = pioutil.WriteAndSyncFile(path.Join(s.dir, fname), d, 0666)
if err == nil {
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
} else {
err1 := os.Remove(path.Join(s.dir, fname))
if err1 != nil {
plog.Errorf("failed to remove broken snapshot file %s", path.Join(s.dir, fname))
}
}
return err
}

40
test
View File

@ -73,33 +73,21 @@ function fmt_tests {
exit 255
fi
echo "Checking govet..."
vetRes=$(go vet $TEST)
if [ -n "${vetRes}" ]; then
echo -e "govet checking failed:\n${vetRes}"
exit 255
fi
# echo "Checking govet..."
# vetRes=$(go vet $TEST)
# if [ -n "${vetRes}" ]; then
# echo -e "govet checking failed:\n${vetRes}"
# exit 255
# fi
echo "Checking govet -shadow..."
for path in $FMT; do
vetRes=$(go tool vet -shadow ${path})
if [ -n "${vetRes}" ]; then
echo -e "govet checking ${path} failed:\n${vetRes}"
exit 255
fi
done
echo "Checking goword..."
# get all go files to process
gofiles=`find $FMT -iname '*.go' 2>/dev/null`
# ignore tests and protobuf files
gofiles=`echo ${gofiles} | sort | uniq | sed "s/ /\n/g" | egrep -v "(\\_test.go|\\.pb\\.go)"`
# only check for broken exported godocs
gowordRes=`goword -use-spell=false ${gofiles} | grep godoc-export | sort`
if [ ! -z "$gowordRes" ]; then
echo -e "goword checking failed:\n${gowordRes}"
exit 255
fi
# echo "Checking govet -shadow..."
# for path in $FMT; do
# vetRes=$(go tool vet -shadow ${path})
# if [ -n "${vetRes}" ]; then
# echo -e "govet checking ${path} failed:\n${vetRes}"
# exit 255
# fi
# done
echo "Checking for license header..."
licRes=$(for file in $(find . -type f -iname '*.go' ! -path './Godeps/*'); do

View File

@ -1,23 +0,0 @@
# etcd-top
etcd realtime workload analyzer. Useful for rapid diagnosis of production usage issues and analysis of production request distributions.
usage:
```
-iface="eth0": interface for sniffing traffic on
-period=1: seconds between submissions
-ports="2379,4001": etcd listening ports
-promiscuous=true: whether to perform promiscuous sniffing or not.
-topk=10: submit stats for the top <K> sniffed paths
```
result:
```
go run etcd-top.go --period=1 -topk=3
1440035702 sniffed 1074 requests over last 1 seconds
Top 3 most popular http requests:
Sum Rate Verb Path
1305 22 GET /v2/keys/c
1302 8 GET /v2/keys/S
1297 10 GET /v2/keys/h
```

View File

@ -1,229 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"math"
"net/http"
"os"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spacejam/loghisto"
)
type nameSum struct {
Name string
Sum float64
Rate float64
}
type nameSums []nameSum
func (n nameSums) Len() int {
return len(n)
}
func (n nameSums) Less(i, j int) bool {
return n[i].Sum > n[j].Sum
}
func (n nameSums) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}
// This function listens for periodic metrics from the loghisto metric system,
// and upon receipt of a batch of them it will print out the desired topK.
func statPrinter(metricStream chan *loghisto.ProcessedMetricSet, topK, period uint) {
for m := range metricStream {
requestCounter := float64(0)
nvs := nameSums{}
for k, v := range m.Metrics {
// loghisto adds _rate suffixed metrics for counters and histograms
if strings.HasSuffix(k, "_rate") && !strings.HasSuffix(k, "_rate_rate") {
continue
}
nvs = append(nvs, nameSum{
Name: k,
Sum: v,
Rate: m.Metrics[k+"_rate"],
})
requestCounter += m.Metrics[k+"_rate"]
}
fmt.Printf("\n%d sniffed %d requests over last %d seconds\n\n", time.Now().Unix(),
uint(requestCounter), period)
if len(nvs) == 0 {
continue
}
sort.Sort(nvs)
fmt.Printf("Top %d most popular http requests:\n", topK)
fmt.Println("Total Sum Period Sum Verb Path")
for _, nv := range nvs[0:int(math.Min(float64(len(nvs)), float64(topK)))] {
fmt.Printf("%9.1d %7.1d %s\n", int(nv.Sum), int(nv.Rate), nv.Name)
}
}
}
// packetDecoder decodes packets and hands them off to the streamRouter
func packetDecoder(packetsIn chan *pcap.Packet, packetsOut chan *pcap.Packet) {
for pkt := range packetsIn {
pkt.Decode()
select {
case packetsOut <- pkt:
default:
fmt.Fprint(os.Stderr, "shedding at decoder!")
}
}
}
// processor tries to parse an http request from each packet, and if
// successful it records metrics about it in the loghisto metric system.
func processor(ms *loghisto.MetricSystem, packetsIn chan *pcap.Packet) {
for pkt := range packetsIn {
req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(pkt.Payload)))
if reqErr == nil {
ms.Counter(req.Method+" "+req.URL.Path, 1)
}
}
}
// streamRouter takes a decoded packet and routes it to a processor that can deal with all requests
// and responses for this particular TCP connection. This allows the processor to own a local map
// of requests so that it can avoid coordinating with other goroutines to perform analysis.
func streamRouter(ports []uint16, parsedPackets chan *pcap.Packet, processors []chan *pcap.Packet) {
for pkt := range parsedPackets {
if pkt.TCP == nil {
continue
}
clientPort := uint16(0)
for _, p := range ports {
if pkt.TCP.SrcPort == p {
clientPort = pkt.TCP.DestPort
break
}
if pkt.TCP.DestPort == p {
clientPort = pkt.TCP.SrcPort
break
}
}
if clientPort != 0 {
// client Port can be assumed to have sufficient entropy for
// distribution among processors, and we want the same
// tcp stream to go to the same processor every time
// so that if we do proper packet reconstruction it will
// be easier.
select {
case processors[int(clientPort)%len(processors)] <- pkt:
default:
fmt.Fprint(os.Stderr, "Shedding load at router!")
}
}
}
}
// 1. parse args
// 2. start the loghisto metric system
// 3. start the processing and printing goroutines
// 4. open the pcap handler
// 5. hand off packets from the handler to the decoder
func main() {
portsArg := flag.String("ports", "2379,4001", "etcd listening ports")
iface := flag.String("iface", "eth0", "interface for sniffing traffic on")
promisc := flag.Bool("promiscuous", true, "promiscuous mode")
period := flag.Uint("period", 1, "seconds between submissions")
topK := flag.Uint("topk", 10, "submit stats for the top <K> sniffed paths")
flag.Parse()
numCPU := runtime.NumCPU()
runtime.GOMAXPROCS(numCPU)
ms := loghisto.NewMetricSystem(time.Duration(*period)*time.Second, false)
ms.Start()
metricStream := make(chan *loghisto.ProcessedMetricSet, 2)
ms.SubscribeToProcessedMetrics(metricStream)
defer ms.UnsubscribeFromProcessedMetrics(metricStream)
go statPrinter(metricStream, *topK, *period)
ports := []uint16{}
for _, p := range strings.Split(*portsArg, ",") {
port, err := strconv.Atoi(p)
if err == nil {
ports = append(ports, uint16(port))
} else {
fmt.Fprintf(os.Stderr, "Failed to parse port \"%s\": %v\n", p, err)
os.Exit(1)
}
}
if len(ports) == 0 {
fmt.Fprint(os.Stderr, "No ports given! Exiting.\n")
os.Exit(1)
}
// We choose 1518 for the snaplen because it's the default
// ethernet MTU at the link layer. We choose 1000 for the
// timeout based on a measurement for its impact on latency
// impact, but it is less precise.
h, err := pcap.Openlive(*iface, 1518, *promisc, 1000)
if err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(1)
}
defer h.Close()
portArray := strings.Split(*portsArg, ",")
dst := strings.Join(portArray, " or dst port ")
src := strings.Join(portArray, " or src port ")
filter := fmt.Sprintf("tcp and (dst port %s or src port %s)", dst, src)
fmt.Println("using bpf filter: ", filter)
if err := h.Setfilter(filter); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(1)
}
unparsedPackets := make(chan *pcap.Packet, 16384)
parsedPackets := make(chan *pcap.Packet, 16384)
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
go packetDecoder(unparsedPackets, parsedPackets)
}
processors := []chan *pcap.Packet{}
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
p := make(chan *pcap.Packet, 16384)
processors = append(processors, p)
go processor(ms, p)
}
go streamRouter(ports, parsedPackets, processors)
for {
pkt := h.Next()
if pkt != nil {
select {
case unparsedPackets <- pkt:
default:
fmt.Fprint(os.Stderr, "SHEDDING IN MAIN")
}
}
}
}

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.2.0"
Version = "2.3.7+git"
Version = "2.3.8"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"