Godeps: add dependency for etcd-top

release-2.3
Xiang Li 2015-11-01 17:44:19 -08:00
parent 00557e96af
commit 2bfe995fb8
22 changed files with 3250 additions and 0 deletions

8
Godeps/Godeps.json generated
View File

@ -10,6 +10,10 @@
"Comment": "null-5",
"Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675"
},
{
"ImportPath": "github.com/akrennmair/gopcap",
"Rev": "00e11033259acb75598ba416495bb708d864a010"
},
{
"ImportPath": "github.com/beorn7/perks/quantile",
"Rev": "b965b613227fddccbfffe13eae360ed3fa822f8d"
@ -105,6 +109,10 @@
"ImportPath": "github.com/prometheus/procfs",
"Rev": "454a56f35412459b5e684fd5ec0f9211b94f002a"
},
{
"ImportPath": "github.com/spacejam/loghisto",
"Rev": "323309774dec8b7430187e46cd0793974ccca04a"
},
{
"ImportPath": "github.com/stretchr/testify/assert",
"Rev": "9cc77fa25329013ce07362c7742952ff887361f2"

View File

@ -0,0 +1,5 @@
#*
*~
/tools/pass/pass
/tools/pcaptest/pcaptest
/tools/tcpdump/tcpdump

View File

@ -0,0 +1,27 @@
Copyright (c) 2009-2011 Andreas Krennmair. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Andreas Krennmair nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,11 @@
# PCAP
This is a simple wrapper around libpcap for Go. Originally written by Andreas
Krennmair <ak@synflood.at> and only minorly touched up by Mark Smith <mark@qq.is>.
Please see the included pcaptest.go and tcpdump.go programs for instructions on
how to use this library.
Miek Gieben <miek@miek.nl> has created a more Go-like package and replaced functionality
with standard functions from the standard library. The package has also been renamed to
pcap.

View File

@ -0,0 +1,527 @@
package pcap
import (
"encoding/binary"
"fmt"
"net"
"reflect"
"strings"
)
const (
TYPE_IP = 0x0800
TYPE_ARP = 0x0806
TYPE_IP6 = 0x86DD
TYPE_VLAN = 0x8100
IP_ICMP = 1
IP_INIP = 4
IP_TCP = 6
IP_UDP = 17
)
const (
ERRBUF_SIZE = 256
// According to pcap-linktype(7).
LINKTYPE_NULL = 0
LINKTYPE_ETHERNET = 1
LINKTYPE_TOKEN_RING = 6
LINKTYPE_ARCNET = 7
LINKTYPE_SLIP = 8
LINKTYPE_PPP = 9
LINKTYPE_FDDI = 10
LINKTYPE_ATM_RFC1483 = 100
LINKTYPE_RAW = 101
LINKTYPE_PPP_HDLC = 50
LINKTYPE_PPP_ETHER = 51
LINKTYPE_C_HDLC = 104
LINKTYPE_IEEE802_11 = 105
LINKTYPE_FRELAY = 107
LINKTYPE_LOOP = 108
LINKTYPE_LINUX_SLL = 113
LINKTYPE_LTALK = 104
LINKTYPE_PFLOG = 117
LINKTYPE_PRISM_HEADER = 119
LINKTYPE_IP_OVER_FC = 122
LINKTYPE_SUNATM = 123
LINKTYPE_IEEE802_11_RADIO = 127
LINKTYPE_ARCNET_LINUX = 129
LINKTYPE_LINUX_IRDA = 144
LINKTYPE_LINUX_LAPD = 177
)
type addrHdr interface {
SrcAddr() string
DestAddr() string
Len() int
}
type addrStringer interface {
String(addr addrHdr) string
}
func decodemac(pkt []byte) uint64 {
mac := uint64(0)
for i := uint(0); i < 6; i++ {
mac = (mac << 8) + uint64(pkt[i])
}
return mac
}
// Decode decodes the headers of a Packet.
func (p *Packet) Decode() {
if len(p.Data) <= 14 {
return
}
p.Type = int(binary.BigEndian.Uint16(p.Data[12:14]))
p.DestMac = decodemac(p.Data[0:6])
p.SrcMac = decodemac(p.Data[6:12])
if len(p.Data) >= 15 {
p.Payload = p.Data[14:]
}
switch p.Type {
case TYPE_IP:
p.decodeIp()
case TYPE_IP6:
p.decodeIp6()
case TYPE_ARP:
p.decodeArp()
case TYPE_VLAN:
p.decodeVlan()
}
}
func (p *Packet) headerString(headers []interface{}) string {
// If there's just one header, return that.
if len(headers) == 1 {
if hdr, ok := headers[0].(fmt.Stringer); ok {
return hdr.String()
}
}
// If there are two headers (IPv4/IPv6 -> TCP/UDP/IP..)
if len(headers) == 2 {
// Commonly the first header is an address.
if addr, ok := p.Headers[0].(addrHdr); ok {
if hdr, ok := p.Headers[1].(addrStringer); ok {
return fmt.Sprintf("%s %s", p.Time, hdr.String(addr))
}
}
}
// For IP in IP, we do a recursive call.
if len(headers) >= 2 {
if addr, ok := headers[0].(addrHdr); ok {
if _, ok := headers[1].(addrHdr); ok {
return fmt.Sprintf("%s > %s IP in IP: ",
addr.SrcAddr(), addr.DestAddr(), p.headerString(headers[1:]))
}
}
}
var typeNames []string
for _, hdr := range headers {
typeNames = append(typeNames, reflect.TypeOf(hdr).String())
}
return fmt.Sprintf("unknown [%s]", strings.Join(typeNames, ","))
}
// String prints a one-line representation of the packet header.
// The output is suitable for use in a tcpdump program.
func (p *Packet) String() string {
// If there are no headers, print "unsupported protocol".
if len(p.Headers) == 0 {
return fmt.Sprintf("%s unsupported protocol %d", p.Time, int(p.Type))
}
return fmt.Sprintf("%s %s", p.Time, p.headerString(p.Headers))
}
// Arphdr is a ARP packet header.
type Arphdr struct {
Addrtype uint16
Protocol uint16
HwAddressSize uint8
ProtAddressSize uint8
Operation uint16
SourceHwAddress []byte
SourceProtAddress []byte
DestHwAddress []byte
DestProtAddress []byte
}
func (arp *Arphdr) String() (s string) {
switch arp.Operation {
case 1:
s = "ARP request"
case 2:
s = "ARP Reply"
}
if arp.Addrtype == LINKTYPE_ETHERNET && arp.Protocol == TYPE_IP {
s = fmt.Sprintf("%012x (%s) > %012x (%s)",
decodemac(arp.SourceHwAddress), arp.SourceProtAddress,
decodemac(arp.DestHwAddress), arp.DestProtAddress)
} else {
s = fmt.Sprintf("addrtype = %d protocol = %d", arp.Addrtype, arp.Protocol)
}
return
}
func (p *Packet) decodeArp() {
if len(p.Payload) < 8 {
return
}
pkt := p.Payload
arp := new(Arphdr)
arp.Addrtype = binary.BigEndian.Uint16(pkt[0:2])
arp.Protocol = binary.BigEndian.Uint16(pkt[2:4])
arp.HwAddressSize = pkt[4]
arp.ProtAddressSize = pkt[5]
arp.Operation = binary.BigEndian.Uint16(pkt[6:8])
if len(pkt) < int(8+2*arp.HwAddressSize+2*arp.ProtAddressSize) {
return
}
arp.SourceHwAddress = pkt[8 : 8+arp.HwAddressSize]
arp.SourceProtAddress = pkt[8+arp.HwAddressSize : 8+arp.HwAddressSize+arp.ProtAddressSize]
arp.DestHwAddress = pkt[8+arp.HwAddressSize+arp.ProtAddressSize : 8+2*arp.HwAddressSize+arp.ProtAddressSize]
arp.DestProtAddress = pkt[8+2*arp.HwAddressSize+arp.ProtAddressSize : 8+2*arp.HwAddressSize+2*arp.ProtAddressSize]
p.Headers = append(p.Headers, arp)
if len(pkt) >= int(8+2*arp.HwAddressSize+2*arp.ProtAddressSize) {
p.Payload = p.Payload[8+2*arp.HwAddressSize+2*arp.ProtAddressSize:]
}
}
// IPadr is the header of an IP packet.
type Iphdr struct {
Version uint8
Ihl uint8
Tos uint8
Length uint16
Id uint16
Flags uint8
FragOffset uint16
Ttl uint8
Protocol uint8
Checksum uint16
SrcIp []byte
DestIp []byte
}
func (p *Packet) decodeIp() {
if len(p.Payload) < 20 {
return
}
pkt := p.Payload
ip := new(Iphdr)
ip.Version = uint8(pkt[0]) >> 4
ip.Ihl = uint8(pkt[0]) & 0x0F
ip.Tos = pkt[1]
ip.Length = binary.BigEndian.Uint16(pkt[2:4])
ip.Id = binary.BigEndian.Uint16(pkt[4:6])
flagsfrags := binary.BigEndian.Uint16(pkt[6:8])
ip.Flags = uint8(flagsfrags >> 13)
ip.FragOffset = flagsfrags & 0x1FFF
ip.Ttl = pkt[8]
ip.Protocol = pkt[9]
ip.Checksum = binary.BigEndian.Uint16(pkt[10:12])
ip.SrcIp = pkt[12:16]
ip.DestIp = pkt[16:20]
pEnd := int(ip.Length)
if pEnd > len(pkt) {
pEnd = len(pkt)
}
if len(pkt) >= pEnd && int(ip.Ihl*4) < pEnd {
p.Payload = pkt[ip.Ihl*4 : pEnd]
} else {
p.Payload = []byte{}
}
p.Headers = append(p.Headers, ip)
p.IP = ip
switch ip.Protocol {
case IP_TCP:
p.decodeTcp()
case IP_UDP:
p.decodeUdp()
case IP_ICMP:
p.decodeIcmp()
case IP_INIP:
p.decodeIp()
}
}
func (ip *Iphdr) SrcAddr() string { return net.IP(ip.SrcIp).String() }
func (ip *Iphdr) DestAddr() string { return net.IP(ip.DestIp).String() }
func (ip *Iphdr) Len() int { return int(ip.Length) }
type Vlanhdr struct {
Priority byte
DropEligible bool
VlanIdentifier int
Type int // Not actually part of the vlan header, but the type of the actual packet
}
func (v *Vlanhdr) String() {
fmt.Sprintf("VLAN Priority:%d Drop:%v Tag:%d", v.Priority, v.DropEligible, v.VlanIdentifier)
}
func (p *Packet) decodeVlan() {
pkt := p.Payload
vlan := new(Vlanhdr)
if len(pkt) < 4 {
return
}
vlan.Priority = (pkt[2] & 0xE0) >> 13
vlan.DropEligible = pkt[2]&0x10 != 0
vlan.VlanIdentifier = int(binary.BigEndian.Uint16(pkt[:2])) & 0x0FFF
vlan.Type = int(binary.BigEndian.Uint16(p.Payload[2:4]))
p.Headers = append(p.Headers, vlan)
if len(pkt) >= 5 {
p.Payload = p.Payload[4:]
}
switch vlan.Type {
case TYPE_IP:
p.decodeIp()
case TYPE_IP6:
p.decodeIp6()
case TYPE_ARP:
p.decodeArp()
}
}
type Tcphdr struct {
SrcPort uint16
DestPort uint16
Seq uint32
Ack uint32
DataOffset uint8
Flags uint16
Window uint16
Checksum uint16
Urgent uint16
Data []byte
}
const (
TCP_FIN = 1 << iota
TCP_SYN
TCP_RST
TCP_PSH
TCP_ACK
TCP_URG
TCP_ECE
TCP_CWR
TCP_NS
)
func (p *Packet) decodeTcp() {
if len(p.Payload) < 20 {
return
}
pkt := p.Payload
tcp := new(Tcphdr)
tcp.SrcPort = binary.BigEndian.Uint16(pkt[0:2])
tcp.DestPort = binary.BigEndian.Uint16(pkt[2:4])
tcp.Seq = binary.BigEndian.Uint32(pkt[4:8])
tcp.Ack = binary.BigEndian.Uint32(pkt[8:12])
tcp.DataOffset = (pkt[12] & 0xF0) >> 4
tcp.Flags = binary.BigEndian.Uint16(pkt[12:14]) & 0x1FF
tcp.Window = binary.BigEndian.Uint16(pkt[14:16])
tcp.Checksum = binary.BigEndian.Uint16(pkt[16:18])
tcp.Urgent = binary.BigEndian.Uint16(pkt[18:20])
if len(pkt) >= int(tcp.DataOffset*4) {
p.Payload = pkt[tcp.DataOffset*4:]
}
p.Headers = append(p.Headers, tcp)
p.TCP = tcp
}
func (tcp *Tcphdr) String(hdr addrHdr) string {
return fmt.Sprintf("TCP %s:%d > %s:%d %s SEQ=%d ACK=%d LEN=%d",
hdr.SrcAddr(), int(tcp.SrcPort), hdr.DestAddr(), int(tcp.DestPort),
tcp.FlagsString(), int64(tcp.Seq), int64(tcp.Ack), hdr.Len())
}
func (tcp *Tcphdr) FlagsString() string {
var sflags []string
if 0 != (tcp.Flags & TCP_SYN) {
sflags = append(sflags, "syn")
}
if 0 != (tcp.Flags & TCP_FIN) {
sflags = append(sflags, "fin")
}
if 0 != (tcp.Flags & TCP_ACK) {
sflags = append(sflags, "ack")
}
if 0 != (tcp.Flags & TCP_PSH) {
sflags = append(sflags, "psh")
}
if 0 != (tcp.Flags & TCP_RST) {
sflags = append(sflags, "rst")
}
if 0 != (tcp.Flags & TCP_URG) {
sflags = append(sflags, "urg")
}
if 0 != (tcp.Flags & TCP_NS) {
sflags = append(sflags, "ns")
}
if 0 != (tcp.Flags & TCP_CWR) {
sflags = append(sflags, "cwr")
}
if 0 != (tcp.Flags & TCP_ECE) {
sflags = append(sflags, "ece")
}
return fmt.Sprintf("[%s]", strings.Join(sflags, " "))
}
type Udphdr struct {
SrcPort uint16
DestPort uint16
Length uint16
Checksum uint16
}
func (p *Packet) decodeUdp() {
if len(p.Payload) < 8 {
return
}
pkt := p.Payload
udp := new(Udphdr)
udp.SrcPort = binary.BigEndian.Uint16(pkt[0:2])
udp.DestPort = binary.BigEndian.Uint16(pkt[2:4])
udp.Length = binary.BigEndian.Uint16(pkt[4:6])
udp.Checksum = binary.BigEndian.Uint16(pkt[6:8])
p.Headers = append(p.Headers, udp)
p.UDP = udp
if len(p.Payload) >= 8 {
p.Payload = pkt[8:]
}
}
func (udp *Udphdr) String(hdr addrHdr) string {
return fmt.Sprintf("UDP %s:%d > %s:%d LEN=%d CHKSUM=%d",
hdr.SrcAddr(), int(udp.SrcPort), hdr.DestAddr(), int(udp.DestPort),
int(udp.Length), int(udp.Checksum))
}
type Icmphdr struct {
Type uint8
Code uint8
Checksum uint16
Id uint16
Seq uint16
Data []byte
}
func (p *Packet) decodeIcmp() *Icmphdr {
if len(p.Payload) < 8 {
return nil
}
pkt := p.Payload
icmp := new(Icmphdr)
icmp.Type = pkt[0]
icmp.Code = pkt[1]
icmp.Checksum = binary.BigEndian.Uint16(pkt[2:4])
icmp.Id = binary.BigEndian.Uint16(pkt[4:6])
icmp.Seq = binary.BigEndian.Uint16(pkt[6:8])
p.Payload = pkt[8:]
p.Headers = append(p.Headers, icmp)
return icmp
}
func (icmp *Icmphdr) String(hdr addrHdr) string {
return fmt.Sprintf("ICMP %s > %s Type = %d Code = %d ",
hdr.SrcAddr(), hdr.DestAddr(), icmp.Type, icmp.Code)
}
func (icmp *Icmphdr) TypeString() (result string) {
switch icmp.Type {
case 0:
result = fmt.Sprintf("Echo reply seq=%d", icmp.Seq)
case 3:
switch icmp.Code {
case 0:
result = "Network unreachable"
case 1:
result = "Host unreachable"
case 2:
result = "Protocol unreachable"
case 3:
result = "Port unreachable"
default:
result = "Destination unreachable"
}
case 8:
result = fmt.Sprintf("Echo request seq=%d", icmp.Seq)
case 30:
result = "Traceroute"
}
return
}
type Ip6hdr struct {
// http://www.networksorcery.com/enp/protocol/ipv6.htm
Version uint8 // 4 bits
TrafficClass uint8 // 8 bits
FlowLabel uint32 // 20 bits
Length uint16 // 16 bits
NextHeader uint8 // 8 bits, same as Protocol in Iphdr
HopLimit uint8 // 8 bits
SrcIp []byte // 16 bytes
DestIp []byte // 16 bytes
}
func (p *Packet) decodeIp6() {
if len(p.Payload) < 40 {
return
}
pkt := p.Payload
ip6 := new(Ip6hdr)
ip6.Version = uint8(pkt[0]) >> 4
ip6.TrafficClass = uint8((binary.BigEndian.Uint16(pkt[0:2]) >> 4) & 0x00FF)
ip6.FlowLabel = binary.BigEndian.Uint32(pkt[0:4]) & 0x000FFFFF
ip6.Length = binary.BigEndian.Uint16(pkt[4:6])
ip6.NextHeader = pkt[6]
ip6.HopLimit = pkt[7]
ip6.SrcIp = pkt[8:24]
ip6.DestIp = pkt[24:40]
if len(p.Payload) >= 40 {
p.Payload = pkt[40:]
}
p.Headers = append(p.Headers, ip6)
switch ip6.NextHeader {
case IP_TCP:
p.decodeTcp()
case IP_UDP:
p.decodeUdp()
case IP_ICMP:
p.decodeIcmp()
case IP_INIP:
p.decodeIp()
}
}
func (ip6 *Ip6hdr) SrcAddr() string { return net.IP(ip6.SrcIp).String() }
func (ip6 *Ip6hdr) DestAddr() string { return net.IP(ip6.DestIp).String() }
func (ip6 *Ip6hdr) Len() int { return int(ip6.Length) }

View File

@ -0,0 +1,247 @@
package pcap
import (
"bytes"
"testing"
"time"
)
var testSimpleTcpPacket *Packet = &Packet{
Data: []byte{
0x00, 0x00, 0x0c, 0x9f, 0xf0, 0x20, 0xbc, 0x30, 0x5b, 0xe8, 0xd3, 0x49,
0x08, 0x00, 0x45, 0x00, 0x01, 0xa4, 0x39, 0xdf, 0x40, 0x00, 0x40, 0x06,
0x55, 0x5a, 0xac, 0x11, 0x51, 0x49, 0xad, 0xde, 0xfe, 0xe1, 0xc5, 0xf7,
0x00, 0x50, 0xc5, 0x7e, 0x0e, 0x48, 0x49, 0x07, 0x42, 0x32, 0x80, 0x18,
0x00, 0x73, 0xab, 0xb1, 0x00, 0x00, 0x01, 0x01, 0x08, 0x0a, 0x03, 0x77,
0x37, 0x9c, 0x42, 0x77, 0x5e, 0x3a, 0x47, 0x45, 0x54, 0x20, 0x2f, 0x20,
0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, 0x2e, 0x31, 0x0d, 0x0a, 0x48, 0x6f,
0x73, 0x74, 0x3a, 0x20, 0x77, 0x77, 0x77, 0x2e, 0x66, 0x69, 0x73, 0x68,
0x2e, 0x63, 0x6f, 0x6d, 0x0d, 0x0a, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63,
0x74, 0x69, 0x6f, 0x6e, 0x3a, 0x20, 0x6b, 0x65, 0x65, 0x70, 0x2d, 0x61,
0x6c, 0x69, 0x76, 0x65, 0x0d, 0x0a, 0x55, 0x73, 0x65, 0x72, 0x2d, 0x41,
0x67, 0x65, 0x6e, 0x74, 0x3a, 0x20, 0x4d, 0x6f, 0x7a, 0x69, 0x6c, 0x6c,
0x61, 0x2f, 0x35, 0x2e, 0x30, 0x20, 0x28, 0x58, 0x31, 0x31, 0x3b, 0x20,
0x4c, 0x69, 0x6e, 0x75, 0x78, 0x20, 0x78, 0x38, 0x36, 0x5f, 0x36, 0x34,
0x29, 0x20, 0x41, 0x70, 0x70, 0x6c, 0x65, 0x57, 0x65, 0x62, 0x4b, 0x69,
0x74, 0x2f, 0x35, 0x33, 0x35, 0x2e, 0x32, 0x20, 0x28, 0x4b, 0x48, 0x54,
0x4d, 0x4c, 0x2c, 0x20, 0x6c, 0x69, 0x6b, 0x65, 0x20, 0x47, 0x65, 0x63,
0x6b, 0x6f, 0x29, 0x20, 0x43, 0x68, 0x72, 0x6f, 0x6d, 0x65, 0x2f, 0x31,
0x35, 0x2e, 0x30, 0x2e, 0x38, 0x37, 0x34, 0x2e, 0x31, 0x32, 0x31, 0x20,
0x53, 0x61, 0x66, 0x61, 0x72, 0x69, 0x2f, 0x35, 0x33, 0x35, 0x2e, 0x32,
0x0d, 0x0a, 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x3a, 0x20, 0x74, 0x65,
0x78, 0x74, 0x2f, 0x68, 0x74, 0x6d, 0x6c, 0x2c, 0x61, 0x70, 0x70, 0x6c,
0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x78, 0x68, 0x74, 0x6d,
0x6c, 0x2b, 0x78, 0x6d, 0x6c, 0x2c, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x78, 0x6d, 0x6c, 0x3b, 0x71, 0x3d,
0x30, 0x2e, 0x39, 0x2c, 0x2a, 0x2f, 0x2a, 0x3b, 0x71, 0x3d, 0x30, 0x2e,
0x38, 0x0d, 0x0a, 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x2d, 0x45, 0x6e,
0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x3a, 0x20, 0x67, 0x7a, 0x69, 0x70,
0x2c, 0x64, 0x65, 0x66, 0x6c, 0x61, 0x74, 0x65, 0x2c, 0x73, 0x64, 0x63,
0x68, 0x0d, 0x0a, 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x2d, 0x4c, 0x61,
0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x3a, 0x20, 0x65, 0x6e, 0x2d, 0x55,
0x53, 0x2c, 0x65, 0x6e, 0x3b, 0x71, 0x3d, 0x30, 0x2e, 0x38, 0x0d, 0x0a,
0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x2d, 0x43, 0x68, 0x61, 0x72, 0x73,
0x65, 0x74, 0x3a, 0x20, 0x49, 0x53, 0x4f, 0x2d, 0x38, 0x38, 0x35, 0x39,
0x2d, 0x31, 0x2c, 0x75, 0x74, 0x66, 0x2d, 0x38, 0x3b, 0x71, 0x3d, 0x30,
0x2e, 0x37, 0x2c, 0x2a, 0x3b, 0x71, 0x3d, 0x30, 0x2e, 0x33, 0x0d, 0x0a,
0x0d, 0x0a,
}}
func BenchmarkDecodeSimpleTcpPacket(b *testing.B) {
for i := 0; i < b.N; i++ {
testSimpleTcpPacket.Decode()
}
}
func TestDecodeSimpleTcpPacket(t *testing.T) {
p := testSimpleTcpPacket
p.Decode()
if p.DestMac != 0x00000c9ff020 {
t.Error("Dest mac", p.DestMac)
}
if p.SrcMac != 0xbc305be8d349 {
t.Error("Src mac", p.SrcMac)
}
if len(p.Headers) != 2 {
t.Error("Incorrect number of headers", len(p.Headers))
return
}
if ip, ipOk := p.Headers[0].(*Iphdr); ipOk {
if ip.Version != 4 {
t.Error("ip Version", ip.Version)
}
if ip.Ihl != 5 {
t.Error("ip header length", ip.Ihl)
}
if ip.Tos != 0 {
t.Error("ip TOS", ip.Tos)
}
if ip.Length != 420 {
t.Error("ip Length", ip.Length)
}
if ip.Id != 14815 {
t.Error("ip ID", ip.Id)
}
if ip.Flags != 0x02 {
t.Error("ip Flags", ip.Flags)
}
if ip.FragOffset != 0 {
t.Error("ip Fragoffset", ip.FragOffset)
}
if ip.Ttl != 64 {
t.Error("ip TTL", ip.Ttl)
}
if ip.Protocol != 6 {
t.Error("ip Protocol", ip.Protocol)
}
if ip.Checksum != 0x555A {
t.Error("ip Checksum", ip.Checksum)
}
if !bytes.Equal(ip.SrcIp, []byte{172, 17, 81, 73}) {
t.Error("ip Src", ip.SrcIp)
}
if !bytes.Equal(ip.DestIp, []byte{173, 222, 254, 225}) {
t.Error("ip Dest", ip.DestIp)
}
if tcp, tcpOk := p.Headers[1].(*Tcphdr); tcpOk {
if tcp.SrcPort != 50679 {
t.Error("tcp srcport", tcp.SrcPort)
}
if tcp.DestPort != 80 {
t.Error("tcp destport", tcp.DestPort)
}
if tcp.Seq != 0xc57e0e48 {
t.Error("tcp seq", tcp.Seq)
}
if tcp.Ack != 0x49074232 {
t.Error("tcp ack", tcp.Ack)
}
if tcp.DataOffset != 8 {
t.Error("tcp dataoffset", tcp.DataOffset)
}
if tcp.Flags != 0x18 {
t.Error("tcp flags", tcp.Flags)
}
if tcp.Window != 0x73 {
t.Error("tcp window", tcp.Window)
}
if tcp.Checksum != 0xabb1 {
t.Error("tcp checksum", tcp.Checksum)
}
if tcp.Urgent != 0 {
t.Error("tcp urgent", tcp.Urgent)
}
} else {
t.Error("Second header is not TCP header")
}
} else {
t.Error("First header is not IP header")
}
if string(p.Payload) != "GET / HTTP/1.1\r\nHost: www.fish.com\r\nConnection: keep-alive\r\nUser-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.2 (KHTML, like Gecko) Chrome/15.0.874.121 Safari/535.2\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\n\r\n" {
t.Error("--- PAYLOAD STRING ---\n", string(p.Payload), "\n--- PAYLOAD BYTES ---\n", p.Payload)
}
}
// Makes sure packet payload doesn't display the 6 trailing null of this packet
// as part of the payload. They're actually the ethernet trailer.
func TestDecodeSmallTcpPacketHasEmptyPayload(t *testing.T) {
p := &Packet{
// This packet is only 54 bits (an empty TCP RST), thus 6 trailing null
// bytes are added by the ethernet layer to make it the minimum packet size.
Data: []byte{
0xbc, 0x30, 0x5b, 0xe8, 0xd3, 0x49, 0xb8, 0xac, 0x6f, 0x92, 0xd5, 0xbf,
0x08, 0x00, 0x45, 0x00, 0x00, 0x28, 0x00, 0x00, 0x40, 0x00, 0x40, 0x06,
0x3f, 0x9f, 0xac, 0x11, 0x51, 0xc5, 0xac, 0x11, 0x51, 0x49, 0x00, 0x63,
0x9a, 0xef, 0x00, 0x00, 0x00, 0x00, 0x2e, 0xc1, 0x27, 0x83, 0x50, 0x14,
0x00, 0x00, 0xc3, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}}
p.Decode()
if p.Payload == nil {
t.Error("Nil payload")
}
if len(p.Payload) != 0 {
t.Error("Non-empty payload:", p.Payload)
}
}
func TestDecodeVlanPacket(t *testing.T) {
p := &Packet{
Data: []byte{
0x00, 0x10, 0xdb, 0xff, 0x10, 0x00, 0x00, 0x15, 0x2c, 0x9d, 0xcc, 0x00, 0x81, 0x00, 0x01, 0xf7,
0x08, 0x00, 0x45, 0x00, 0x00, 0x28, 0x29, 0x8d, 0x40, 0x00, 0x7d, 0x06, 0x83, 0xa0, 0xac, 0x1b,
0xca, 0x8e, 0x45, 0x16, 0x94, 0xe2, 0xd4, 0x0a, 0x00, 0x50, 0xdf, 0xab, 0x9c, 0xc6, 0xcd, 0x1e,
0xe5, 0xd1, 0x50, 0x10, 0x01, 0x00, 0x5a, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}}
p.Decode()
if p.Type != TYPE_VLAN {
t.Error("Didn't detect vlan")
}
if len(p.Headers) != 3 {
t.Error("Incorrect number of headers:", len(p.Headers))
for i, h := range p.Headers {
t.Errorf("Header %d: %#v", i, h)
}
t.FailNow()
}
if _, ok := p.Headers[0].(*Vlanhdr); !ok {
t.Errorf("First header isn't vlan: %q", p.Headers[0])
}
if _, ok := p.Headers[1].(*Iphdr); !ok {
t.Errorf("Second header isn't IP: %q", p.Headers[1])
}
if _, ok := p.Headers[2].(*Tcphdr); !ok {
t.Errorf("Third header isn't TCP: %q", p.Headers[2])
}
}
func TestDecodeFuzzFallout(t *testing.T) {
testData := []struct {
Data []byte
}{
{[]byte("000000000000\x81\x000")},
{[]byte("000000000000\x81\x00000")},
{[]byte("000000000000\x86\xdd0")},
{[]byte("000000000000\b\x000")},
{[]byte("000000000000\b\x060")},
{[]byte{}},
{[]byte("000000000000\b\x0600000000")},
{[]byte("000000000000\x86\xdd000000\x01000000000000000000000000000000000")},
{[]byte("000000000000\x81\x0000\b\x0600000000")},
{[]byte("000000000000\b\x00n0000000000000000000")},
{[]byte("000000000000\x86\xdd000000\x0100000000000000000000000000000000000")},
{[]byte("000000000000\x81\x0000\b\x00g0000000000000000000")},
//{[]byte()},
{[]byte("000000000000\b\x00400000000\x110000000000")},
{[]byte("0nMء\xfe\x13\x13\x81\x00gr\b\x00&x\xc9\xe5b'\x1e0\x00\x04\x00\x0020596224")},
{[]byte("000000000000\x81\x0000\b\x00400000000\x110000000000")},
{[]byte("000000000000\b\x00000000000\x0600\xff0000000")},
{[]byte("000000000000\x86\xdd000000\x06000000000000000000000000000000000")},
{[]byte("000000000000\x81\x0000\b\x00000000000\x0600b0000000")},
{[]byte("000000000000\x81\x0000\b\x00400000000\x060000000000")},
{[]byte("000000000000\x86\xdd000000\x11000000000000000000000000000000000")},
{[]byte("000000000000\x86\xdd000000\x0600000000000000000000000000000000000000000000M")},
{[]byte("000000000000\b\x00500000000\x0600000000000")},
{[]byte("0nM\xd80\xfe\x13\x13\x81\x00gr\b\x00&x\xc9\xe5b'\x1e0\x00\x04\x00\x0020596224")},
}
for _, entry := range testData {
pkt := &Packet{
Time: time.Now(),
Caplen: uint32(len(entry.Data)),
Len: uint32(len(entry.Data)),
Data: entry.Data,
}
pkt.Decode()
/*
func() {
defer func() {
if err := recover(); err != nil {
t.Fatalf("%d. %q failed: %v", idx, string(entry.Data), err)
}
}()
pkt.Decode()
}()
*/
}
}

View File

@ -0,0 +1,206 @@
package pcap
import (
"encoding/binary"
"fmt"
"io"
"time"
)
// FileHeader is the parsed header of a pcap file.
// http://wiki.wireshark.org/Development/LibpcapFileFormat
type FileHeader struct {
MagicNumber uint32
VersionMajor uint16
VersionMinor uint16
TimeZone int32
SigFigs uint32
SnapLen uint32
Network uint32
}
type PacketTime struct {
Sec int32
Usec int32
}
// Convert the PacketTime to a go Time struct.
func (p *PacketTime) Time() time.Time {
return time.Unix(int64(p.Sec), int64(p.Usec)*1000)
}
// Packet is a single packet parsed from a pcap file.
//
// Convenient access to IP, TCP, and UDP headers is provided after Decode()
// is called if the packet is of the appropriate type.
type Packet struct {
Time time.Time // packet send/receive time
Caplen uint32 // bytes stored in the file (caplen <= len)
Len uint32 // bytes sent/received
Data []byte // packet data
Type int // protocol type, see LINKTYPE_*
DestMac uint64
SrcMac uint64
Headers []interface{} // decoded headers, in order
Payload []byte // remaining non-header bytes
IP *Iphdr // IP header (for IP packets, after decoding)
TCP *Tcphdr // TCP header (for TCP packets, after decoding)
UDP *Udphdr // UDP header (for UDP packets after decoding)
}
// Reader parses pcap files.
type Reader struct {
flip bool
buf io.Reader
err error
fourBytes []byte
twoBytes []byte
sixteenBytes []byte
Header FileHeader
}
// NewReader reads pcap data from an io.Reader.
func NewReader(reader io.Reader) (*Reader, error) {
r := &Reader{
buf: reader,
fourBytes: make([]byte, 4),
twoBytes: make([]byte, 2),
sixteenBytes: make([]byte, 16),
}
switch magic := r.readUint32(); magic {
case 0xa1b2c3d4:
r.flip = false
case 0xd4c3b2a1:
r.flip = true
default:
return nil, fmt.Errorf("pcap: bad magic number: %0x", magic)
}
r.Header = FileHeader{
MagicNumber: 0xa1b2c3d4,
VersionMajor: r.readUint16(),
VersionMinor: r.readUint16(),
TimeZone: r.readInt32(),
SigFigs: r.readUint32(),
SnapLen: r.readUint32(),
Network: r.readUint32(),
}
return r, nil
}
// Next returns the next packet or nil if no more packets can be read.
func (r *Reader) Next() *Packet {
d := r.sixteenBytes
r.err = r.read(d)
if r.err != nil {
return nil
}
timeSec := asUint32(d[0:4], r.flip)
timeUsec := asUint32(d[4:8], r.flip)
capLen := asUint32(d[8:12], r.flip)
origLen := asUint32(d[12:16], r.flip)
data := make([]byte, capLen)
if r.err = r.read(data); r.err != nil {
return nil
}
return &Packet{
Time: time.Unix(int64(timeSec), int64(timeUsec)),
Caplen: capLen,
Len: origLen,
Data: data,
}
}
func (r *Reader) read(data []byte) error {
var err error
n, err := r.buf.Read(data)
for err == nil && n != len(data) {
var chunk int
chunk, err = r.buf.Read(data[n:])
n += chunk
}
if len(data) == n {
return nil
}
return err
}
func (r *Reader) readUint32() uint32 {
data := r.fourBytes
if r.err = r.read(data); r.err != nil {
return 0
}
return asUint32(data, r.flip)
}
func (r *Reader) readInt32() int32 {
data := r.fourBytes
if r.err = r.read(data); r.err != nil {
return 0
}
return int32(asUint32(data, r.flip))
}
func (r *Reader) readUint16() uint16 {
data := r.twoBytes
if r.err = r.read(data); r.err != nil {
return 0
}
return asUint16(data, r.flip)
}
// Writer writes a pcap file.
type Writer struct {
writer io.Writer
buf []byte
}
// NewWriter creates a Writer that stores output in an io.Writer.
// The FileHeader is written immediately.
func NewWriter(writer io.Writer, header *FileHeader) (*Writer, error) {
w := &Writer{
writer: writer,
buf: make([]byte, 24),
}
binary.LittleEndian.PutUint32(w.buf, header.MagicNumber)
binary.LittleEndian.PutUint16(w.buf[4:], header.VersionMajor)
binary.LittleEndian.PutUint16(w.buf[6:], header.VersionMinor)
binary.LittleEndian.PutUint32(w.buf[8:], uint32(header.TimeZone))
binary.LittleEndian.PutUint32(w.buf[12:], header.SigFigs)
binary.LittleEndian.PutUint32(w.buf[16:], header.SnapLen)
binary.LittleEndian.PutUint32(w.buf[20:], header.Network)
if _, err := writer.Write(w.buf); err != nil {
return nil, err
}
return w, nil
}
// Writer writes a packet to the underlying writer.
func (w *Writer) Write(pkt *Packet) error {
binary.LittleEndian.PutUint32(w.buf, uint32(pkt.Time.Unix()))
binary.LittleEndian.PutUint32(w.buf[4:], uint32(pkt.Time.Nanosecond()))
binary.LittleEndian.PutUint32(w.buf[8:], uint32(pkt.Time.Unix()))
binary.LittleEndian.PutUint32(w.buf[12:], pkt.Len)
if _, err := w.writer.Write(w.buf[:16]); err != nil {
return err
}
_, err := w.writer.Write(pkt.Data)
return err
}
func asUint32(data []byte, flip bool) uint32 {
if flip {
return binary.BigEndian.Uint32(data)
}
return binary.LittleEndian.Uint32(data)
}
func asUint16(data []byte, flip bool) uint16 {
if flip {
return binary.BigEndian.Uint16(data)
}
return binary.LittleEndian.Uint16(data)
}

View File

@ -0,0 +1,266 @@
// Interface to both live and offline pcap parsing.
package pcap
/*
#cgo linux LDFLAGS: -lpcap
#cgo freebsd LDFLAGS: -lpcap
#cgo darwin LDFLAGS: -lpcap
#cgo windows CFLAGS: -I C:/WpdPack/Include
#cgo windows,386 LDFLAGS: -L C:/WpdPack/Lib -lwpcap
#cgo windows,amd64 LDFLAGS: -L C:/WpdPack/Lib/x64 -lwpcap
#include <stdlib.h>
#include <pcap.h>
// Workaround for not knowing how to cast to const u_char**
int hack_pcap_next_ex(pcap_t *p, struct pcap_pkthdr **pkt_header,
u_char **pkt_data) {
return pcap_next_ex(p, pkt_header, (const u_char **)pkt_data);
}
*/
import "C"
import (
"errors"
"net"
"syscall"
"time"
"unsafe"
)
type Pcap struct {
cptr *C.pcap_t
}
type Stat struct {
PacketsReceived uint32
PacketsDropped uint32
PacketsIfDropped uint32
}
type Interface struct {
Name string
Description string
Addresses []IFAddress
// TODO: add more elements
}
type IFAddress struct {
IP net.IP
Netmask net.IPMask
// TODO: add broadcast + PtP dst ?
}
func (p *Pcap) Next() (pkt *Packet) {
rv, _ := p.NextEx()
return rv
}
// Openlive opens a device and returns a *Pcap handler
func Openlive(device string, snaplen int32, promisc bool, timeout_ms int32) (handle *Pcap, err error) {
var buf *C.char
buf = (*C.char)(C.calloc(ERRBUF_SIZE, 1))
h := new(Pcap)
var pro int32
if promisc {
pro = 1
}
dev := C.CString(device)
defer C.free(unsafe.Pointer(dev))
h.cptr = C.pcap_open_live(dev, C.int(snaplen), C.int(pro), C.int(timeout_ms), buf)
if nil == h.cptr {
handle = nil
err = errors.New(C.GoString(buf))
} else {
handle = h
}
C.free(unsafe.Pointer(buf))
return
}
func Openoffline(file string) (handle *Pcap, err error) {
var buf *C.char
buf = (*C.char)(C.calloc(ERRBUF_SIZE, 1))
h := new(Pcap)
cf := C.CString(file)
defer C.free(unsafe.Pointer(cf))
h.cptr = C.pcap_open_offline(cf, buf)
if nil == h.cptr {
handle = nil
err = errors.New(C.GoString(buf))
} else {
handle = h
}
C.free(unsafe.Pointer(buf))
return
}
func (p *Pcap) NextEx() (pkt *Packet, result int32) {
var pkthdr *C.struct_pcap_pkthdr
var buf_ptr *C.u_char
var buf unsafe.Pointer
result = int32(C.hack_pcap_next_ex(p.cptr, &pkthdr, &buf_ptr))
buf = unsafe.Pointer(buf_ptr)
if nil == buf {
return
}
pkt = new(Packet)
pkt.Time = time.Unix(int64(pkthdr.ts.tv_sec), int64(pkthdr.ts.tv_usec)*1000)
pkt.Caplen = uint32(pkthdr.caplen)
pkt.Len = uint32(pkthdr.len)
pkt.Data = C.GoBytes(buf, C.int(pkthdr.caplen))
return
}
func (p *Pcap) Close() {
C.pcap_close(p.cptr)
}
func (p *Pcap) Geterror() error {
return errors.New(C.GoString(C.pcap_geterr(p.cptr)))
}
func (p *Pcap) Getstats() (stat *Stat, err error) {
var cstats _Ctype_struct_pcap_stat
if -1 == C.pcap_stats(p.cptr, &cstats) {
return nil, p.Geterror()
}
stats := new(Stat)
stats.PacketsReceived = uint32(cstats.ps_recv)
stats.PacketsDropped = uint32(cstats.ps_drop)
stats.PacketsIfDropped = uint32(cstats.ps_ifdrop)
return stats, nil
}
func (p *Pcap) Setfilter(expr string) (err error) {
var bpf _Ctype_struct_bpf_program
cexpr := C.CString(expr)
defer C.free(unsafe.Pointer(cexpr))
if -1 == C.pcap_compile(p.cptr, &bpf, cexpr, 1, 0) {
return p.Geterror()
}
if -1 == C.pcap_setfilter(p.cptr, &bpf) {
C.pcap_freecode(&bpf)
return p.Geterror()
}
C.pcap_freecode(&bpf)
return nil
}
func Version() string {
return C.GoString(C.pcap_lib_version())
}
func (p *Pcap) Datalink() int {
return int(C.pcap_datalink(p.cptr))
}
func (p *Pcap) Setdatalink(dlt int) error {
if -1 == C.pcap_set_datalink(p.cptr, C.int(dlt)) {
return p.Geterror()
}
return nil
}
func DatalinkValueToName(dlt int) string {
if name := C.pcap_datalink_val_to_name(C.int(dlt)); name != nil {
return C.GoString(name)
}
return ""
}
func DatalinkValueToDescription(dlt int) string {
if desc := C.pcap_datalink_val_to_description(C.int(dlt)); desc != nil {
return C.GoString(desc)
}
return ""
}
func Findalldevs() (ifs []Interface, err error) {
var buf *C.char
buf = (*C.char)(C.calloc(ERRBUF_SIZE, 1))
defer C.free(unsafe.Pointer(buf))
var alldevsp *C.pcap_if_t
if -1 == C.pcap_findalldevs((**C.pcap_if_t)(&alldevsp), buf) {
return nil, errors.New(C.GoString(buf))
}
defer C.pcap_freealldevs((*C.pcap_if_t)(alldevsp))
dev := alldevsp
var i uint32
for i = 0; dev != nil; dev = (*C.pcap_if_t)(dev.next) {
i++
}
ifs = make([]Interface, i)
dev = alldevsp
for j := uint32(0); dev != nil; dev = (*C.pcap_if_t)(dev.next) {
var iface Interface
iface.Name = C.GoString(dev.name)
iface.Description = C.GoString(dev.description)
iface.Addresses = findalladdresses(dev.addresses)
// TODO: add more elements
ifs[j] = iface
j++
}
return
}
func findalladdresses(addresses *_Ctype_struct_pcap_addr) (retval []IFAddress) {
// TODO - make it support more than IPv4 and IPv6?
retval = make([]IFAddress, 0, 1)
for curaddr := addresses; curaddr != nil; curaddr = (*_Ctype_struct_pcap_addr)(curaddr.next) {
var a IFAddress
var err error
if a.IP, err = sockaddr_to_IP((*syscall.RawSockaddr)(unsafe.Pointer(curaddr.addr))); err != nil {
continue
}
if a.Netmask, err = sockaddr_to_IP((*syscall.RawSockaddr)(unsafe.Pointer(curaddr.addr))); err != nil {
continue
}
retval = append(retval, a)
}
return
}
func sockaddr_to_IP(rsa *syscall.RawSockaddr) (IP []byte, err error) {
switch rsa.Family {
case syscall.AF_INET:
pp := (*syscall.RawSockaddrInet4)(unsafe.Pointer(rsa))
IP = make([]byte, 4)
for i := 0; i < len(IP); i++ {
IP[i] = pp.Addr[i]
}
return
case syscall.AF_INET6:
pp := (*syscall.RawSockaddrInet6)(unsafe.Pointer(rsa))
IP = make([]byte, 16)
for i := 0; i < len(IP); i++ {
IP[i] = pp.Addr[i]
}
return
}
err = errors.New("Unsupported address type")
return
}
func (p *Pcap) Inject(data []byte) (err error) {
buf := (*C.char)(C.malloc((C.size_t)(len(data))))
for i := 0; i < len(data); i++ {
*(*byte)(unsafe.Pointer(uintptr(unsafe.Pointer(buf)) + uintptr(i))) = data[i]
}
if -1 == C.pcap_sendpacket(p.cptr, (*C.u_char)(unsafe.Pointer(buf)), (C.int)(len(data))) {
err = p.Geterror()
}
C.free(unsafe.Pointer(buf))
return
}

View File

@ -0,0 +1,49 @@
package main
import (
"flag"
"fmt"
"os"
"runtime/pprof"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap"
)
func main() {
var filename *string = flag.String("file", "", "filename")
var decode *bool = flag.Bool("d", false, "If true, decode each packet")
var cpuprofile *string = flag.String("cpuprofile", "", "filename")
flag.Parse()
h, err := pcap.Openoffline(*filename)
if err != nil {
fmt.Printf("Couldn't create pcap reader: %v", err)
}
if *cpuprofile != "" {
if out, err := os.Create(*cpuprofile); err == nil {
pprof.StartCPUProfile(out)
defer func() {
pprof.StopCPUProfile()
out.Close()
}()
} else {
panic(err)
}
}
i, nilPackets := 0, 0
start := time.Now()
for pkt, code := h.NextEx(); code != -2; pkt, code = h.NextEx() {
if pkt == nil {
nilPackets++
} else if *decode {
pkt.Decode()
}
i++
}
duration := time.Since(start)
fmt.Printf("Took %v to process %v packets, %v per packet, %d nil packets\n", duration, i, duration/time.Duration(i), nilPackets)
}

View File

@ -0,0 +1,96 @@
package main
// Parses a pcap file, writes it back to disk, then verifies the files
// are the same.
import (
"bufio"
"flag"
"fmt"
"io"
"os"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap"
)
var input *string = flag.String("input", "", "input file")
var output *string = flag.String("output", "", "output file")
var decode *bool = flag.Bool("decode", false, "print decoded packets")
func copyPcap(dest, src string) {
f, err := os.Open(src)
if err != nil {
fmt.Printf("couldn't open %q: %v\n", src, err)
return
}
defer f.Close()
reader, err := pcap.NewReader(bufio.NewReader(f))
if err != nil {
fmt.Printf("couldn't create reader: %v\n", err)
return
}
w, err := os.Create(dest)
if err != nil {
fmt.Printf("couldn't open %q: %v\n", dest, err)
return
}
defer w.Close()
buf := bufio.NewWriter(w)
writer, err := pcap.NewWriter(buf, &reader.Header)
if err != nil {
fmt.Printf("couldn't create writer: %v\n", err)
return
}
for {
pkt := reader.Next()
if pkt == nil {
break
}
if *decode {
pkt.Decode()
fmt.Println(pkt.String())
}
writer.Write(pkt)
}
buf.Flush()
}
func check(dest, src string) {
f, err := os.Open(src)
if err != nil {
fmt.Printf("couldn't open %q: %v\n", src, err)
return
}
defer f.Close()
freader := bufio.NewReader(f)
g, err := os.Open(dest)
if err != nil {
fmt.Printf("couldn't open %q: %v\n", src, err)
return
}
defer g.Close()
greader := bufio.NewReader(g)
for {
fb, ferr := freader.ReadByte()
gb, gerr := greader.ReadByte()
if ferr == io.EOF && gerr == io.EOF {
break
}
if fb == gb {
continue
}
fmt.Println("FAIL")
return
}
fmt.Println("PASS")
}
func main() {
flag.Parse()
copyPcap(*output, *input)
check(*output, *input)
}

View File

@ -0,0 +1,82 @@
package main
import (
"flag"
"fmt"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap"
)
func min(x uint32, y uint32) uint32 {
if x < y {
return x
}
return y
}
func main() {
var device *string = flag.String("d", "", "device")
var file *string = flag.String("r", "", "file")
var expr *string = flag.String("e", "", "filter expression")
flag.Parse()
var h *pcap.Pcap
var err error
ifs, err := pcap.Findalldevs()
if len(ifs) == 0 {
fmt.Printf("Warning: no devices found : %s\n", err)
} else {
for i := 0; i < len(ifs); i++ {
fmt.Printf("dev %d: %s (%s)\n", i+1, ifs[i].Name, ifs[i].Description)
}
}
if *device != "" {
h, err = pcap.Openlive(*device, 65535, true, 0)
if h == nil {
fmt.Printf("Openlive(%s) failed: %s\n", *device, err)
return
}
} else if *file != "" {
h, err = pcap.Openoffline(*file)
if h == nil {
fmt.Printf("Openoffline(%s) failed: %s\n", *file, err)
return
}
} else {
fmt.Printf("usage: pcaptest [-d <device> | -r <file>]\n")
return
}
defer h.Close()
fmt.Printf("pcap version: %s\n", pcap.Version())
if *expr != "" {
fmt.Printf("Setting filter: %s\n", *expr)
err := h.Setfilter(*expr)
if err != nil {
fmt.Printf("Warning: setting filter failed: %s\n", err)
}
}
for pkt := h.Next(); pkt != nil; pkt = h.Next() {
fmt.Printf("time: %d.%06d (%s) caplen: %d len: %d\nData:",
int64(pkt.Time.Second()), int64(pkt.Time.Nanosecond()),
time.Unix(int64(pkt.Time.Second()), 0).String(), int64(pkt.Caplen), int64(pkt.Len))
for i := uint32(0); i < pkt.Caplen; i++ {
if i%32 == 0 {
fmt.Printf("\n")
}
if 32 <= pkt.Data[i] && pkt.Data[i] <= 126 {
fmt.Printf("%c", pkt.Data[i])
} else {
fmt.Printf(".")
}
}
fmt.Printf("\n\n")
}
}

View File

@ -0,0 +1,121 @@
package main
import (
"bufio"
"flag"
"fmt"
"os"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap"
)
const (
TYPE_IP = 0x0800
TYPE_ARP = 0x0806
TYPE_IP6 = 0x86DD
IP_ICMP = 1
IP_INIP = 4
IP_TCP = 6
IP_UDP = 17
)
var out *bufio.Writer
var errout *bufio.Writer
func main() {
var device *string = flag.String("i", "", "interface")
var snaplen *int = flag.Int("s", 65535, "snaplen")
var hexdump *bool = flag.Bool("X", false, "hexdump")
expr := ""
out = bufio.NewWriter(os.Stdout)
errout = bufio.NewWriter(os.Stderr)
flag.Usage = func() {
fmt.Fprintf(errout, "usage: %s [ -i interface ] [ -s snaplen ] [ -X ] [ expression ]\n", os.Args[0])
errout.Flush()
os.Exit(1)
}
flag.Parse()
if len(flag.Args()) > 0 {
expr = flag.Arg(0)
}
if *device == "" {
devs, err := pcap.Findalldevs()
if err != nil {
fmt.Fprintf(errout, "tcpdump: couldn't find any devices: %s\n", err)
}
if 0 == len(devs) {
flag.Usage()
}
*device = devs[0].Name
}
h, err := pcap.Openlive(*device, int32(*snaplen), true, 0)
if h == nil {
fmt.Fprintf(errout, "tcpdump: %s\n", err)
errout.Flush()
return
}
defer h.Close()
if expr != "" {
ferr := h.Setfilter(expr)
if ferr != nil {
fmt.Fprintf(out, "tcpdump: %s\n", ferr)
out.Flush()
}
}
for pkt := h.Next(); pkt != nil; pkt = h.Next() {
pkt.Decode()
fmt.Fprintf(out, "%s\n", pkt.String())
if *hexdump {
Hexdump(pkt)
}
out.Flush()
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func Hexdump(pkt *pcap.Packet) {
for i := 0; i < len(pkt.Data); i += 16 {
Dumpline(uint32(i), pkt.Data[i:min(i+16, len(pkt.Data))])
}
}
func Dumpline(addr uint32, line []byte) {
fmt.Fprintf(out, "\t0x%04x: ", int32(addr))
var i uint16
for i = 0; i < 16 && i < uint16(len(line)); i++ {
if i%2 == 0 {
out.WriteString(" ")
}
fmt.Fprintf(out, "%02x", line[i])
}
for j := i; j <= 16; j++ {
if j%2 == 0 {
out.WriteString(" ")
}
out.WriteString(" ")
}
out.WriteString(" ")
for i = 0; i < 16 && i < uint16(len(line)); i++ {
if line[i] >= 32 && line[i] <= 126 {
fmt.Fprintf(out, "%c", line[i])
} else {
out.WriteString(".")
}
}
out.WriteString("\n")
}

View File

@ -0,0 +1,5 @@
language: go
go:
- 1.4
- tip

View File

@ -0,0 +1,75 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Tyler Neely (t@jujit.su)
package loghisto
import (
"bytes"
"fmt"
"os"
"strings"
)
type graphiteStat struct {
Metric string
Time int64
Value float64
Host string
}
type graphiteStatArray []*graphiteStat
func (stats graphiteStatArray) ToRequest() []byte {
var request bytes.Buffer
for _, stat := range stats {
request.Write([]byte(fmt.Sprintf("cockroach.%s.%s %f %d\n",
stat.Host,
strings.Replace(stat.Metric, "_", ".", -1),
stat.Value,
stat.Time,
)))
}
return []byte(request.String())
}
func (metricSet *ProcessedMetricSet) tographiteStats() graphiteStatArray {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
stats := make([]*graphiteStat, 0, len(metricSet.Metrics))
i := 0
for metric, value := range metricSet.Metrics {
//TODO(tyler) custom tags
stats = append(stats, &graphiteStat{
Metric: metric,
Time: metricSet.Time.Unix(),
Value: value,
Host: hostname,
})
i++
}
return stats
}
// GraphiteProtocol generates a wire representation of a ProcessedMetricSet
// for submission to a Graphite Carbon instance using the plaintext protocol.
func GraphiteProtocol(ms *ProcessedMetricSet) []byte {
return ms.tographiteStats().ToRequest()
}

View File

@ -0,0 +1,23 @@
package loghisto
import (
"testing"
"time"
)
func TestGraphite(t *testing.T) {
ms := NewMetricSystem(time.Second, true)
s := NewSubmitter(ms, GraphiteProtocol, "tcp", "localhost:7777")
s.Start()
metrics := &ProcessedMetricSet{
Time: time.Now(),
Metrics: map[string]float64{
"test.3": 50.54,
"test.4": 10.21,
},
}
request := s.serializer(metrics)
s.submit(request)
s.Shutdown()
}

View File

@ -0,0 +1,653 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Tyler Neely (t@jujit.su)
// IMPORTANT: only subscribe to the metric stream
// using buffered channels that are regularly
// flushed, as reaper will NOT block while trying
// to send metrics to a subscriber, and will ignore
// a subscriber if they fail to clear their channel
// 3 times in a row!
package loghisto
import (
"errors"
"fmt"
"math"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/golang/glog"
)
const (
// precision effects the bucketing used during histogram value compression.
precision = 100
)
// ProcessedMetricSet contains human-readable metrics that may also be
// suitable for storage in time-series databases.
type ProcessedMetricSet struct {
Time time.Time
Metrics map[string]float64
}
// RawMetricSet contains metrics in a form that supports generation of
// percentiles and other rich statistics.
type RawMetricSet struct {
Time time.Time
Counters map[string]uint64
Rates map[string]uint64
Histograms map[string]map[int16]*uint64
Gauges map[string]float64
}
// TimerToken facilitates concurrent timings of durations of the same label.
type TimerToken struct {
Name string
Start time.Time
MetricSystem *MetricSystem
}
// proportion is a compact value with a corresponding count of
// occurrences in this interval.
type proportion struct {
Value float64
Count uint64
}
// proportionArray is a sortable collection of proportion types.
type proportionArray []proportion
// MetricSystem facilitates the collection and distribution of metrics.
type MetricSystem struct {
// percentiles is a mapping from labels to desired percentiles to be
// calculated by the MetricSystem
percentiles map[string]float64
// interval is the duration between collections and broadcasts of metrics
// to subscribers.
interval time.Duration
// subscribeToRawMetrics allows subscription to a RawMetricSet generated
// by reaper at the end of each interval on a sent channel.
subscribeToRawMetrics chan chan *RawMetricSet
// unsubscribeFromRawMetrics allows subscribers to unsubscribe from
// receiving a RawMetricSet on the sent channel.
unsubscribeFromRawMetrics chan chan *RawMetricSet
// subscribeToProcessedMetrics allows subscription to a ProcessedMetricSet
// generated by reaper at the end of each interval on a sent channel.
subscribeToProcessedMetrics chan chan *ProcessedMetricSet
// unsubscribeFromProcessedMetrics allows subscribers to unsubscribe from
// receiving a ProcessedMetricSet on the sent channel.
unsubscribeFromProcessedMetrics chan chan *ProcessedMetricSet
// rawSubscribers stores current subscribers to RawMetrics
rawSubscribers map[chan *RawMetricSet]struct{}
// rawBadSubscribers tracks misbehaving subscribers who do not clear their
// subscription channels regularly.
rawBadSubscribers map[chan *RawMetricSet]int
// processedSubscribers stores current subscribers to ProcessedMetrics
processedSubscribers map[chan *ProcessedMetricSet]struct{}
// processedBadSubscribers tracks misbehaving subscribers who do not clear
// their subscription channels regularly.
processedBadSubscribers map[chan *ProcessedMetricSet]int
// subscribersMu controls access to subscription structures
subscribersMu sync.RWMutex
// counterStore maintains the total counts of counters.
counterStore map[string]*uint64
counterStoreMu sync.RWMutex
// counterCache aggregates new Counters until they are collected by reaper().
counterCache map[string]*uint64
// counterMu controls access to counterCache.
counterMu sync.RWMutex
// histogramCache aggregates Histograms until they are collected by reaper().
histogramCache map[string]map[int16]*uint64
// histogramMu controls access to histogramCache.
histogramMu sync.RWMutex
// histogramCountStore keeps track of aggregate counts and sums for aggregate
// mean calculation.
histogramCountStore map[string]*uint64
// histogramCountMu controls access to the histogramCountStore.
histogramCountMu sync.RWMutex
// gaugeFuncs maps metrics to functions used for calculating their value
gaugeFuncs map[string]func() float64
// gaugeFuncsMu controls access to the gaugeFuncs map.
gaugeFuncsMu sync.Mutex
// Has reaper() been started?
reaping bool
// Close this to bring down this MetricSystem
shutdownChan chan struct{}
}
// Metrics is the default metric system, which collects and broadcasts metrics
// to subscribers once every 60 seconds. Also includes default system stats.
var Metrics = NewMetricSystem(60*time.Second, true)
// NewMetricSystem returns a new metric system that collects and broadcasts
// metrics after each interval.
func NewMetricSystem(interval time.Duration, sysStats bool) *MetricSystem {
ms := &MetricSystem{
percentiles: map[string]float64{
"%s_min": 0,
"%s_50": .5,
"%s_75": .75,
"%s_90": .9,
"%s_95": .95,
"%s_99": .99,
"%s_99.9": .999,
"%s_99.99": .9999,
"%s_max": 1,
},
interval: interval,
subscribeToRawMetrics: make(chan chan *RawMetricSet, 64),
unsubscribeFromRawMetrics: make(chan chan *RawMetricSet, 64),
subscribeToProcessedMetrics: make(chan chan *ProcessedMetricSet, 64),
unsubscribeFromProcessedMetrics: make(chan chan *ProcessedMetricSet, 64),
rawSubscribers: make(map[chan *RawMetricSet]struct{}),
rawBadSubscribers: make(map[chan *RawMetricSet]int),
processedSubscribers: make(map[chan *ProcessedMetricSet]struct{}),
processedBadSubscribers: make(map[chan *ProcessedMetricSet]int),
counterStore: make(map[string]*uint64),
counterCache: make(map[string]*uint64),
histogramCache: make(map[string]map[int16]*uint64),
histogramCountStore: make(map[string]*uint64),
gaugeFuncs: make(map[string]func() float64),
shutdownChan: make(chan struct{}),
}
if sysStats {
ms.gaugeFuncsMu.Lock()
ms.gaugeFuncs["sys.Alloc"] = func() float64 {
memStats := new(runtime.MemStats)
runtime.ReadMemStats(memStats)
return float64(memStats.Alloc)
}
ms.gaugeFuncs["sys.NumGC"] = func() float64 {
memStats := new(runtime.MemStats)
runtime.ReadMemStats(memStats)
return float64(memStats.NumGC)
}
ms.gaugeFuncs["sys.PauseTotalNs"] = func() float64 {
memStats := new(runtime.MemStats)
runtime.ReadMemStats(memStats)
return float64(memStats.PauseTotalNs)
}
ms.gaugeFuncs["sys.NumGoroutine"] = func() float64 {
return float64(runtime.NumGoroutine())
}
ms.gaugeFuncsMu.Unlock()
}
return ms
}
// SpecifyPercentiles allows users to override the default collected
// and reported percentiles.
func (ms *MetricSystem) SpecifyPercentiles(percentiles map[string]float64) {
ms.percentiles = percentiles
}
// SubscribeToRawMetrics registers a channel to receive RawMetricSets
// periodically generated by reaper at each interval.
func (ms *MetricSystem) SubscribeToRawMetrics(metricStream chan *RawMetricSet) {
ms.subscribeToRawMetrics <- metricStream
}
// UnsubscribeFromRawMetrics registers a channel to receive RawMetricSets
// periodically generated by reaper at each interval.
func (ms *MetricSystem) UnsubscribeFromRawMetrics(
metricStream chan *RawMetricSet) {
ms.unsubscribeFromRawMetrics <- metricStream
}
// SubscribeToProcessedMetrics registers a channel to receive
// ProcessedMetricSets periodically generated by reaper at each interval.
func (ms *MetricSystem) SubscribeToProcessedMetrics(
metricStream chan *ProcessedMetricSet) {
ms.subscribeToProcessedMetrics <- metricStream
}
// UnsubscribeFromProcessedMetrics registers a channel to receive
// ProcessedMetricSets periodically generated by reaper at each interval.
func (ms *MetricSystem) UnsubscribeFromProcessedMetrics(
metricStream chan *ProcessedMetricSet) {
ms.unsubscribeFromProcessedMetrics <- metricStream
}
// StartTimer begins a timer and returns a token which is required for halting
// the timer. This allows for concurrent timings under the same name.
func (ms *MetricSystem) StartTimer(name string) TimerToken {
return TimerToken{
Name: name,
Start: time.Now(),
MetricSystem: ms,
}
}
// Stop stops a timer given by StartTimer, submits a Histogram of its duration
// in nanoseconds, and returns its duration in nanoseconds.
func (tt *TimerToken) Stop() time.Duration {
duration := time.Since(tt.Start)
tt.MetricSystem.Histogram(tt.Name, float64(duration.Nanoseconds()))
return duration
}
// Counter is used for recording a running count of the total occurrences of
// a particular event. A rate is also exported for the amount that a counter
// has increased during an interval of this MetricSystem.
func (ms *MetricSystem) Counter(name string, amount uint64) {
ms.counterMu.RLock()
_, exists := ms.counterCache[name]
// perform lock promotion when we need more control
if exists {
atomic.AddUint64(ms.counterCache[name], amount)
ms.counterMu.RUnlock()
} else {
ms.counterMu.RUnlock()
ms.counterMu.Lock()
_, syncExists := ms.counterCache[name]
if !syncExists {
var z uint64
ms.counterCache[name] = &z
}
atomic.AddUint64(ms.counterCache[name], amount)
ms.counterMu.Unlock()
}
}
// Histogram is used for generating rich metrics, such as percentiles, from
// periodically occurring continuous values.
func (ms *MetricSystem) Histogram(name string, value float64) {
compressedValue := compress(value)
ms.histogramMu.RLock()
_, present := ms.histogramCache[name][compressedValue]
if present {
atomic.AddUint64(ms.histogramCache[name][compressedValue], 1)
ms.histogramMu.RUnlock()
} else {
ms.histogramMu.RUnlock()
ms.histogramMu.Lock()
_, syncPresent := ms.histogramCache[name][compressedValue]
if !syncPresent {
var z uint64
_, mapPresent := ms.histogramCache[name]
if !mapPresent {
ms.histogramCache[name] = make(map[int16]*uint64)
}
ms.histogramCache[name][compressedValue] = &z
}
atomic.AddUint64(ms.histogramCache[name][compressedValue], 1)
ms.histogramMu.Unlock()
}
}
// RegisterGaugeFunc registers a function to be called at each interval
// whose return value will be used to populate the <name> metric.
func (ms *MetricSystem) RegisterGaugeFunc(name string, f func() float64) {
ms.gaugeFuncsMu.Lock()
ms.gaugeFuncs[name] = f
ms.gaugeFuncsMu.Unlock()
}
// DeregisterGaugeFunc deregisters a function for the <name> metric.
func (ms *MetricSystem) DeregisterGaugeFunc(name string) {
ms.gaugeFuncsMu.Lock()
delete(ms.gaugeFuncs, name)
ms.gaugeFuncsMu.Unlock()
}
// compress takes a float64 and lossily shrinks it to an int16 to facilitate
// bucketing of histogram values, staying within 1% of the true value. This
// fails for large values of 1e142 and above, and is inaccurate for values
// closer to 0 than +/- 0.51 or +/- math.Inf.
func compress(value float64) int16 {
i := int16(precision*math.Log(1.0+math.Abs(value)) + 0.5)
if value < 0 {
return -1 * i
}
return i
}
// decompress takes a lossily shrunk int16 and returns a float64 within 1% of
// the original float64 passed to compress.
func decompress(compressedValue int16) float64 {
f := math.Exp(math.Abs(float64(compressedValue))/precision) - 1.0
if compressedValue < 0 {
return -1.0 * f
}
return f
}
// processHistograms derives rich metrics from histograms, currently
// percentiles, sum, count, and mean.
func (ms *MetricSystem) processHistograms(name string,
valuesToCounts map[int16]*uint64) map[string]float64 {
output := make(map[string]float64)
totalSum := float64(0)
totalCount := uint64(0)
proportions := make([]proportion, 0, len(valuesToCounts))
for compressedValue, count := range valuesToCounts {
value := decompress(compressedValue)
totalSum += value * float64(*count)
totalCount += *count
proportions = append(proportions, proportion{Value: value, Count: *count})
}
sumName := fmt.Sprintf("%s_sum", name)
countName := fmt.Sprintf("%s_count", name)
avgName := fmt.Sprintf("%s_avg", name)
// increment interval sum and count
output[countName] = float64(totalCount)
output[sumName] = totalSum
output[avgName] = totalSum / float64(totalCount)
// increment aggregate sum and count
ms.histogramCountMu.RLock()
_, present := ms.histogramCountStore[sumName]
if !present {
ms.histogramCountMu.RUnlock()
ms.histogramCountMu.Lock()
_, syncPresent := ms.histogramCountStore[sumName]
if !syncPresent {
var x uint64
ms.histogramCountStore[sumName] = &x
var z uint64
ms.histogramCountStore[countName] = &z
}
ms.histogramCountMu.Unlock()
ms.histogramCountMu.RLock()
}
atomic.AddUint64(ms.histogramCountStore[sumName], uint64(totalSum))
atomic.AddUint64(ms.histogramCountStore[countName], totalCount)
ms.histogramCountMu.RUnlock()
for label, p := range ms.percentiles {
value, err := percentile(totalCount, proportions, p)
if err != nil {
glog.Errorf("unable to calculate percentile: %s", err)
} else {
output[fmt.Sprintf(label, name)] = value
}
}
return output
}
// These next 3 methods are for the implementation of sort.Interface
func (s proportionArray) Len() int {
return len(s)
}
func (s proportionArray) Less(i, j int) bool {
return s[i].Value < s[j].Value
}
func (s proportionArray) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// percentile calculates a percentile represented as a float64 between 0 and 1
// inclusive from a proportionArray. totalCount is the sum of all counts of
// elements in the proportionArray.
func percentile(totalCount uint64, proportions proportionArray,
percentile float64) (float64, error) {
//TODO(tyler) handle multiple percentiles at once for efficiency
sort.Sort(proportions)
sofar := uint64(0)
for _, proportion := range proportions {
sofar += proportion.Count
if float64(sofar)/float64(totalCount) >= percentile {
return proportion.Value, nil
}
}
return 0, errors.New("Invalid percentile. Should be between 0 and 1.")
}
func (ms *MetricSystem) collectRawMetrics() *RawMetricSet {
normalizedInterval := time.Unix(0, time.Now().UnixNano()/
ms.interval.Nanoseconds()*
ms.interval.Nanoseconds())
ms.counterMu.Lock()
freshCounters := ms.counterCache
ms.counterCache = make(map[string]*uint64)
ms.counterMu.Unlock()
rates := make(map[string]uint64)
for name, count := range freshCounters {
rates[name] = *count
}
counters := make(map[string]uint64)
ms.counterStoreMu.RLock()
// update counters
for name, count := range freshCounters {
_, exists := ms.counterStore[name]
// only take a write lock when it's a totally new counter
if !exists {
ms.counterStoreMu.RUnlock()
ms.counterStoreMu.Lock()
_, syncExists := ms.counterStore[name]
if !syncExists {
var z uint64
ms.counterStore[name] = &z
}
ms.counterStoreMu.Unlock()
ms.counterStoreMu.RLock()
}
atomic.AddUint64(ms.counterStore[name], *count)
}
// copy counters for export
for name, count := range ms.counterStore {
counters[name] = *count
}
ms.counterStoreMu.RUnlock()
ms.histogramMu.Lock()
histograms := ms.histogramCache
ms.histogramCache = make(map[string]map[int16]*uint64)
ms.histogramMu.Unlock()
ms.gaugeFuncsMu.Lock()
gauges := make(map[string]float64)
for name, f := range ms.gaugeFuncs {
gauges[name] = f()
}
ms.gaugeFuncsMu.Unlock()
return &RawMetricSet{
Time: normalizedInterval,
Counters: counters,
Rates: rates,
Histograms: histograms,
Gauges: gauges,
}
}
// processMetrics (potentially slowly) creates human consumable metrics from a
// RawMetricSet, deriving rich statistics from histograms such as percentiles.
func (ms *MetricSystem) processMetrics(
rawMetrics *RawMetricSet) *ProcessedMetricSet {
metrics := make(map[string]float64)
for name, count := range rawMetrics.Counters {
metrics[name] = float64(count)
}
for name, count := range rawMetrics.Rates {
metrics[fmt.Sprintf("%s_rate", name)] = float64(count)
}
for name, valuesToCounts := range rawMetrics.Histograms {
for histoName, histoValue := range ms.processHistograms(name, valuesToCounts) {
metrics[histoName] = histoValue
}
}
for name, value := range rawMetrics.Gauges {
metrics[name] = value
}
return &ProcessedMetricSet{Time: rawMetrics.Time, Metrics: metrics}
}
func (ms *MetricSystem) updateSubscribers() {
ms.subscribersMu.Lock()
defer ms.subscribersMu.Unlock()
for {
select {
case subscriber := <-ms.subscribeToRawMetrics:
ms.rawSubscribers[subscriber] = struct{}{}
case unsubscriber := <-ms.unsubscribeFromRawMetrics:
delete(ms.rawSubscribers, unsubscriber)
case subscriber := <-ms.subscribeToProcessedMetrics:
ms.processedSubscribers[subscriber] = struct{}{}
case unsubscriber := <-ms.unsubscribeFromProcessedMetrics:
delete(ms.processedSubscribers, unsubscriber)
default: // no changes in subscribers
return
}
}
}
// reaper wakes up every <interval> seconds,
// collects and processes metrics, and pushes
// them to the corresponding subscribing channels.
func (ms *MetricSystem) reaper() {
ms.reaping = true
// create goroutine pool to handle multiple processing tasks at once
processChan := make(chan func(), 16)
for i := 0; i < int(math.Max(float64(runtime.NumCPU()/4), 4)); i++ {
go func() {
for {
c, ok := <-processChan
if !ok {
return
}
c()
}
}()
}
// begin reaper main loop
for {
// sleep until the next interval, or die if shutdownChan is closed
tts := ms.interval.Nanoseconds() -
(time.Now().UnixNano() % ms.interval.Nanoseconds())
select {
case <-time.After(time.Duration(tts)):
case <-ms.shutdownChan:
ms.reaping = false
close(processChan)
return
}
rawMetrics := ms.collectRawMetrics()
ms.updateSubscribers()
// broadcast raw metrics
for subscriber := range ms.rawSubscribers {
// new subscribers get all counters, otherwise just the new diffs
select {
case subscriber <- rawMetrics:
delete(ms.rawBadSubscribers, subscriber)
default:
ms.rawBadSubscribers[subscriber]++
glog.Error("a raw subscriber has allowed their channel to fill up. ",
"dropping their metrics on the floor rather than blocking.")
if ms.rawBadSubscribers[subscriber] >= 2 {
glog.Error("this raw subscriber has caused dropped metrics at ",
"least 3 times in a row. closing the channel.")
delete(ms.rawSubscribers, subscriber)
close(subscriber)
}
}
}
// Perform the rest in another goroutine since processing is not
// gauranteed to complete before the interval is up.
sendProcessed := func() {
// this is potentially expensive if there is a massive number of metrics
processedMetrics := ms.processMetrics(rawMetrics)
// add aggregate mean
for name := range rawMetrics.Histograms {
ms.histogramCountMu.RLock()
aggCountPtr, countPresent :=
ms.histogramCountStore[fmt.Sprintf("%s_count", name)]
aggCount := atomic.LoadUint64(aggCountPtr)
aggSumPtr, sumPresent :=
ms.histogramCountStore[fmt.Sprintf("%s_sum", name)]
aggSum := atomic.LoadUint64(aggSumPtr)
ms.histogramCountMu.RUnlock()
if countPresent && sumPresent && aggCount > 0 {
processedMetrics.Metrics[fmt.Sprintf("%s_agg_avg", name)] =
float64(aggSum / aggCount)
processedMetrics.Metrics[fmt.Sprintf("%s_agg_count", name)] =
float64(aggCount)
processedMetrics.Metrics[fmt.Sprintf("%s_agg_sum", name)] =
float64(aggSum)
}
}
// broadcast processed metrics
ms.subscribersMu.Lock()
for subscriber := range ms.processedSubscribers {
select {
case subscriber <- processedMetrics:
delete(ms.processedBadSubscribers, subscriber)
default:
ms.processedBadSubscribers[subscriber]++
glog.Error("a subscriber has allowed their channel to fill up. ",
"dropping their metrics on the floor rather than blocking.")
if ms.processedBadSubscribers[subscriber] >= 2 {
glog.Error("this subscriber has caused dropped metrics at ",
"least 3 times in a row. closing the channel.")
delete(ms.processedSubscribers, subscriber)
close(subscriber)
}
}
}
ms.subscribersMu.Unlock()
}
select {
case processChan <- sendProcessed:
default:
// processChan has filled up, this metric load is not sustainable
glog.Errorf("processing of metrics is taking longer than this node can "+
"handle. dropping this entire interval of %s metrics on the "+
"floor rather than blocking the reaper.", rawMetrics.Time)
}
} // end main reaper loop
}
// Start spawns a goroutine for merging metrics into caches from
// metric submitters, and a reaper goroutine that harvests metrics at the
// default interval of every 60 seconds.
func (ms *MetricSystem) Start() {
if !ms.reaping {
go ms.reaper()
}
}
// Stop shuts down a MetricSystem
func (ms *MetricSystem) Stop() {
close(ms.shutdownChan)
}

View File

@ -0,0 +1,363 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Tyler Neely (t@jujit.su)
package loghisto
import (
"fmt"
"math"
"runtime"
"testing"
"time"
)
func ExampleMetricSystem() {
ms := NewMetricSystem(time.Microsecond, true)
ms.Start()
myMetricStream := make(chan *ProcessedMetricSet, 2)
ms.SubscribeToProcessedMetrics(myMetricStream)
timeToken := ms.StartTimer("submit_metrics")
ms.Counter("range_splits", 1)
ms.Histogram("some_ipc_latency", 123)
timeToken.Stop()
processedMetricSet := <-myMetricStream
ms.UnsubscribeFromProcessedMetrics(myMetricStream)
m := processedMetricSet.Metrics
example := []struct {
Name string
Value float64
}{
{
"total range splits during the process lifetime",
m["range_splits"],
}, {
"range splits in this period",
m["range_splits_rate"],
}, {
"some_ipc 99.9th percentile",
m["some_ipc_latency_99.9"],
}, {
"some_ipc max",
m["some_ipc_latency_max"],
}, {
"some_ipc calls this period",
m["some_ipc_latency_count"],
}, {
"some_ipc calls during the process lifetime",
m["some_ipc_latency_agg_count"],
}, {
"some_ipc total latency this period",
m["some_ipc_latency_sum"],
}, {
"some_ipc mean this period",
m["some_ipc_latency_avg"],
}, {
"some_ipc aggregate man",
m["some_ipc_latency_agg_avg"],
}, {
"time spent submitting metrics this period",
m["submit_metrics_sum"],
}, {
"number of goroutines",
m["sys.NumGoroutine"],
}, {
"time spent in GC",
m["sys.PauseTotalNs"],
},
}
for _, nameValue := range example {
var result string
if nameValue.Value == float64(0) {
result = "NOT present"
} else {
result = "present"
}
fmt.Println(nameValue.Name, result)
}
ms.Stop()
// Output:
// total range splits during the process lifetime present
// range splits in this period present
// some_ipc 99.9th percentile present
// some_ipc max present
// some_ipc calls this period present
// some_ipc calls during the process lifetime present
// some_ipc total latency this period present
// some_ipc mean this period present
// some_ipc aggregate man present
// time spent submitting metrics this period present
// number of goroutines present
// time spent in GC present
}
func TestPercentile(t *testing.T) {
metrics := map[float64]uint64{
10: 9000,
25: 900,
33: 90,
47: 9,
500: 1,
}
percentileToExpected := map[float64]float64{
0: 10,
.99: 25,
.999: 33,
.9991: 47,
.9999: 47,
1: 500,
}
totalcount := uint64(0)
proportions := make([]proportion, 0, len(metrics))
for value, count := range metrics {
totalcount += count
proportions = append(proportions, proportion{Value: value, Count: count})
}
for p, expected := range percentileToExpected {
result, err := percentile(totalcount, proportions, p)
if err != nil {
t.Error("error:", err)
}
// results must be within 1% of their expected values.
diff := math.Abs(expected/result - 1)
if diff > .01 {
t.Errorf("percentile: %.04f, expected: %.04f, actual: %.04f, %% off: %.04f\n",
p, expected, result, diff*100)
}
}
}
func TestCompress(t *testing.T) {
toTest := []float64{
-421408208120481,
-1,
0,
1,
214141241241241,
}
for _, f := range toTest {
result := decompress(compress(f))
var diff float64
if result == 0 {
diff = math.Abs(f - result)
} else {
diff = math.Abs(f/result - 1)
}
if diff > .01 {
t.Errorf("expected: %f, actual: %f, %% off: %.04f\n",
f, result, diff*100)
}
}
}
func TestSysStats(t *testing.T) {
metricSystem := NewMetricSystem(time.Microsecond, true)
gauges := metricSystem.collectRawMetrics().Gauges
v, present := gauges["sys.Alloc"]
if v <= 0 || !present {
t.Errorf("expected positive reported allocated bytes, got %f\n", v)
}
}
func TestTimer(t *testing.T) {
metricSystem := NewMetricSystem(time.Microsecond, false)
token1 := metricSystem.StartTimer("timer1")
token2 := metricSystem.StartTimer("timer1")
time.Sleep(50 & time.Microsecond)
token1.Stop()
time.Sleep(5 * time.Microsecond)
token2.Stop()
token3 := metricSystem.StartTimer("timer1")
time.Sleep(10 * time.Microsecond)
token3.Stop()
result := metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics
if result["timer1_min"] > result["timer1_50"] ||
result["timer1_50"] > result["timer1_max"] {
t.Error("bad result map:", result)
}
}
func TestRate(t *testing.T) {
metricSystem := NewMetricSystem(time.Microsecond, false)
metricSystem.Counter("rate1", 777)
time.Sleep(20 * time.Millisecond)
metrics := metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics
if metrics["rate1_rate"] != 777 {
t.Error("count one value")
}
metricSystem.Counter("rate1", 1223)
time.Sleep(20 * time.Millisecond)
metrics = metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics
if metrics["rate1_rate"] != 1223 {
t.Errorf("expected rate: 1223, actual: %f", metrics["rate1_rate"])
}
metricSystem.Counter("rate1", 1223)
metricSystem.Counter("rate1", 1223)
time.Sleep(20 * time.Millisecond)
metrics = metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics
if metrics["rate1_rate"] != 2446 {
t.Errorf("expected rate: 2446, actual: %f", metrics["rate1_rate"])
}
}
func TestCounter(t *testing.T) {
metricSystem := NewMetricSystem(time.Microsecond, false)
metricSystem.Counter("counter1", 3290)
time.Sleep(20 * time.Millisecond)
metrics := metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics
if metrics["counter1"] != 3290 {
t.Error("count one value", metrics)
}
metricSystem.Counter("counter1", 10000)
time.Sleep(20 * time.Millisecond)
metrics = metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics
if metrics["counter1"] != 13290 {
t.Error("accumulate counts across broadcasts")
}
}
func TestUpdateSubscribers(t *testing.T) {
rawMetricStream := make(chan *RawMetricSet)
processedMetricStream := make(chan *ProcessedMetricSet)
metricSystem := NewMetricSystem(2*time.Microsecond, false)
metricSystem.SubscribeToRawMetrics(rawMetricStream)
metricSystem.SubscribeToProcessedMetrics(processedMetricStream)
metricSystem.Counter("counter5", 33)
go func() {
select {
case <-rawMetricStream:
case <-time.After(20 * time.Millisecond):
t.Error("received no raw metrics from the MetricSystem after 2 milliseconds.")
}
metricSystem.UnsubscribeFromRawMetrics(rawMetricStream)
}()
go func() {
select {
case <-processedMetricStream:
case <-time.After(20 * time.Millisecond):
t.Error("received no processed metrics from the MetricSystem after 2 milliseconds.")
}
metricSystem.UnsubscribeFromProcessedMetrics(processedMetricStream)
}()
metricSystem.Start()
time.Sleep(20 * time.Millisecond)
go func() {
select {
case <-rawMetricStream:
t.Error("received raw metrics from the MetricSystem after unsubscribing.")
default:
}
}()
go func() {
select {
case <-processedMetricStream:
t.Error("received processed metrics from the MetricSystem after unsubscribing.")
default:
}
}()
time.Sleep(20 * time.Millisecond)
}
func TestProcessedBroadcast(t *testing.T) {
processedMetricStream := make(chan *ProcessedMetricSet, 128)
metricSystem := NewMetricSystem(time.Microsecond, false)
metricSystem.SubscribeToProcessedMetrics(processedMetricStream)
metricSystem.Histogram("histogram1", 33)
metricSystem.Histogram("histogram1", 59)
metricSystem.Histogram("histogram1", 330000)
metricSystem.Start()
select {
case processedMetrics := <-processedMetricStream:
if int(processedMetrics.Metrics["histogram1_sum"]) != 331132 {
t.Error("expected histogram1_sum to be 331132, instead was",
processedMetrics.Metrics["histogram1_sum"])
}
if int(processedMetrics.Metrics["histogram1_agg_avg"]) != 110377 {
t.Error("expected histogram1_agg_avg to be 110377, instead was",
processedMetrics.Metrics["histogram1_agg_avg"])
}
if int(processedMetrics.Metrics["histogram1_count"]) != 3 {
t.Error("expected histogram1_count to be 3, instead was",
processedMetrics.Metrics["histogram1_count"])
}
case <-time.After(20 * time.Millisecond):
t.Error("received no metrics from the MetricSystem after 2 milliseconds.")
}
metricSystem.UnsubscribeFromProcessedMetrics(processedMetricStream)
metricSystem.Stop()
}
func TestRawBroadcast(t *testing.T) {
rawMetricStream := make(chan *RawMetricSet, 128)
metricSystem := NewMetricSystem(time.Microsecond, false)
metricSystem.SubscribeToRawMetrics(rawMetricStream)
metricSystem.Counter("counter2", 10)
metricSystem.Counter("counter2", 111)
metricSystem.Start()
select {
case rawMetrics := <-rawMetricStream:
if rawMetrics.Counters["counter2"] != 121 {
t.Error("expected counter2 to be 121, instead was",
rawMetrics.Counters["counter2"])
}
if rawMetrics.Rates["counter2"] != 121 {
t.Error("expected counter2 rate to be 121, instead was",
rawMetrics.Counters["counter2"])
}
case <-time.After(20 * time.Millisecond):
t.Error("received no metrics from the MetricSystem after 2 milliseconds.")
}
metricSystem.UnsubscribeFromRawMetrics(rawMetricStream)
metricSystem.Stop()
}
func TestMetricSystemStop(t *testing.T) {
metricSystem := NewMetricSystem(time.Microsecond, false)
startingRoutines := runtime.NumGoroutine()
metricSystem.Start()
metricSystem.Stop()
time.Sleep(20 * time.Millisecond)
endRoutines := runtime.NumGoroutine()
if startingRoutines < endRoutines {
t.Errorf("lingering goroutines have not been cleaned up: "+
"before: %d, after: %d\n", startingRoutines, endRoutines)
}
}

View File

@ -0,0 +1,85 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Tyler Neely (t@jujit.su)
package loghisto
import (
"bytes"
"fmt"
"os"
"strings"
)
type openTSDBStat struct {
Metric string
Time int64
Value float64
Tags map[string]string
}
type openTSDBStatArray []*openTSDBStat
func mapToTSDProtocolTags(tagMap map[string]string) string {
tags := make([]string, 0, len(tagMap))
for tag, value := range tagMap {
tags = append(tags, fmt.Sprintf("%s=%s", tag, value))
}
return strings.Join(tags, " ")
}
func (stats openTSDBStatArray) ToRequest() []byte {
var request bytes.Buffer
for _, stat := range stats {
request.Write([]byte(fmt.Sprintf("put %s %d %f %s\n",
stat.Metric,
stat.Time,
stat.Value,
mapToTSDProtocolTags(stat.Tags))))
}
return []byte(request.String())
}
func (metricSet *ProcessedMetricSet) toopenTSDBStats() openTSDBStatArray {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
stats := make([]*openTSDBStat, 0, len(metricSet.Metrics))
i := 0
for metric, value := range metricSet.Metrics {
var tags = map[string]string{
"host": hostname,
}
//TODO(tyler) custom tags
stats = append(stats, &openTSDBStat{
Metric: metric,
Time: metricSet.Time.Unix(),
Value: value,
Tags: tags,
})
i++
}
return stats
}
// OpenTSDBProtocol generates a wire representation of a ProcessedMetricSet
// for submission to an OpenTSDB instance.
func OpenTSDBProtocol(ms *ProcessedMetricSet) []byte {
return ms.toopenTSDBStats().ToRequest()
}

View File

@ -0,0 +1,23 @@
package loghisto
import (
"testing"
"time"
)
func TestOpenTSDB(t *testing.T) {
ms := NewMetricSystem(time.Second, true)
s := NewSubmitter(ms, OpenTSDBProtocol, "tcp", "localhost:7777")
s.Start()
metrics := &ProcessedMetricSet{
Time: time.Now(),
Metrics: map[string]float64{
"test.1": 43.32,
"test.2": 12.3,
},
}
request := s.serializer(metrics)
s.submit(request)
s.Shutdown()
}

View File

@ -0,0 +1,106 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Tyler Neely (t@jujit.su)
package loghisto
import (
"fmt"
"os"
"runtime"
"text/tabwriter"
"time"
)
// PrintBenchmark will run the provided function at the specified
// concurrency, time the operation, and once per second write the
// following information to standard out:
//
// 2014-08-09 17:44:57 -0400 EDT
// raft_AppendLogEntries_count: 16488
// raft_AppendLogEntries_max: 3.982478339757623e+07
// raft_AppendLogEntries_99.99: 3.864778314316012e+07
// raft_AppendLogEntries_99.9: 3.4366224772310276e+06
// raft_AppendLogEntries_99: 2.0228126576114902e+06
// raft_AppendLogEntries_50: 469769.7083161708
// raft_AppendLogEntries_min: 129313.15075081984
// raft_AppendLogEntries_sum: 9.975892639594093e+09
// raft_AppendLogEntries_avg: 605039.5827022133
// raft_AppendLogEntries_agg_avg: 618937
// raft_AppendLogEntries_agg_count: 121095
// raft_AppendLogEntries_agg_sum: 7.4950269894e+10
// sys.Alloc: 997328
// sys.NumGC: 1115
// sys.PauseTotalNs: 2.94946542e+08
// sys.NumGoroutine: 26
func PrintBenchmark(name string, concurrency uint, op func()) {
runtime.GOMAXPROCS(runtime.NumCPU())
var ms = NewMetricSystem(time.Second, true)
mc := make(chan *ProcessedMetricSet, 1)
ms.SubscribeToProcessedMetrics(mc)
ms.Start()
defer ms.Stop()
go receiver(name, mc)
for i := uint(0); i < concurrency; i++ {
go func() {
for {
timer := ms.StartTimer(name)
op()
timer.Stop()
}
}()
}
<-make(chan struct{})
}
func receiver(name string, mc chan *ProcessedMetricSet) {
interesting := []string{
fmt.Sprintf("%s_count", name),
fmt.Sprintf("%s_max", name),
fmt.Sprintf("%s_99.99", name),
fmt.Sprintf("%s_99.9", name),
fmt.Sprintf("%s_99", name),
fmt.Sprintf("%s_95", name),
fmt.Sprintf("%s_90", name),
fmt.Sprintf("%s_75", name),
fmt.Sprintf("%s_50", name),
fmt.Sprintf("%s_min", name),
fmt.Sprintf("%s_sum", name),
fmt.Sprintf("%s_avg", name),
fmt.Sprintf("%s_agg_avg", name),
fmt.Sprintf("%s_agg_count", name),
fmt.Sprintf("%s_agg_sum", name),
"sys.Alloc",
"sys.NumGC",
"sys.PauseTotalNs",
"sys.NumGoroutine",
}
w := new(tabwriter.Writer)
w.Init(os.Stdout, 0, 8, 0, '\t', 0)
for m := range mc {
fmt.Fprintln(w, m.Time)
for _, e := range interesting {
fmt.Fprintln(w, fmt.Sprintf("%s:\t", e), m.Metrics[e])
}
fmt.Fprintln(w)
w.Flush()
}
}

View File

@ -0,0 +1,113 @@
loghisto
============
[![Build Status](https://travis-ci.org/spacejam/loghisto.svg)](https://travis-ci.org/spacejam/loghisto)
A metric system for high performance counters and histograms. Unlike popular metric systems today, this does not destroy the accuracy of histograms by sampling. Instead, a logarithmic bucketing function compresses values, generally within 1% of their true value (although between 0 and 1 the precision loss may not be within this boundary). This allows for extreme compression, which allows us to calculate arbitrarily high percentiles with no loss of accuracy - just a small amount of precision. This is particularly useful for highly-clustered events that are tolerant of a small precision loss, but for which you REALLY care about what the tail looks like, such as measuring latency across a distributed system.
Copied out of my work for the CockroachDB metrics system. Based on an algorithm created by Keith Frost.
### running a print benchmark for quick analysis
```go
package main
import (
"runtime"
"github.com/spacejam/loghisto"
)
func benchmark() {
// do some stuff
}
func main() {
numCPU := runtime.NumCPU()
runtime.GOMAXPROCS(numCPU)
desiredConcurrency := uint(100)
loghisto.PrintBenchmark("benchmark1234", desiredConcurrency, benchmark)
}
```
results in something like this printed to stdout each second:
```
2014-12-11 21:41:45 -0500 EST
benchmark1234_count: 2.0171025e+07
benchmark1234_max: 2.4642914167480484e+07
benchmark1234_99.99: 4913.768840299134
benchmark1234_99.9: 1001.2472422902518
benchmark1234_99: 71.24044000732538
benchmark1234_95: 67.03348428941965
benchmark1234_90: 65.68633104092515
benchmark1234_75: 63.07152259993664
benchmark1234_50: 58.739891704145194
benchmark1234_min: -657.5233632152207 // Corollary: time.Since(time.Now()) is often < 0
benchmark1234_sum: 1.648051169322668e+09
benchmark1234_avg: 81.70388809307748
benchmark1234_agg_avg: 89
benchmark1234_agg_count: 6.0962226e+07
benchmark1234_agg_sum: 5.454779078e+09
sys.Alloc: 1.132672e+06
sys.NumGC: 5741
sys.PauseTotalNs: 1.569390954e+09
sys.NumGoroutine: 113
```
### adding an embedded metric system to your code
```go
import (
"time"
"fmt"
"github.com/spacejam/loghisto"
)
func ExampleMetricSystem() {
// Create metric system that reports once a minute, and includes stats
// about goroutines, memory usage and GC.
includeGoProcessStats := true
ms := loghisto.NewMetricSystem(time.Minute, includeGoProcessStats)
ms.Start()
// create a channel that subscribes to metrics as they are produced once
// per minute.
// NOTE: if you allow this channel to fill up, the metric system will NOT
// block, and will FORGET about your channel if you fail to unblock the
// channel after 3 configured intervals (in this case 3 minutes) rather
// than causing a memory leak.
myMetricStream := make(chan *loghisto.ProcessedMetricSet, 2)
ms.SubscribeToProcessedMetrics(myMetricStream)
// create some metrics
timeToken := ms.StartTimer("time for creating a counter and histo")
ms.Counter("some event", 1)
ms.Histogram("some measured thing", 123)
timeToken.Stop()
for m := range myMetricStream {
fmt.Printf("number of goroutines: %f\n", m.Metrics["sys.NumGoroutine"])
}
// if you want to manually unsubscribe from the metric stream
ms.UnsubscribeFromProcessedMetrics(myMetricStream)
// to stop and clean up your metric system
ms.Stop()
}
```
### automatically sending your metrics to OpenTSDB, KairosDB or Graphite
```go
func ExampleExternalSubmitter() {
includeGoProcessStats := true
ms := NewMetricSystem(time.Minute, includeGoProcessStats)
ms.Start()
// graphite
s := NewSubmitter(ms, GraphiteProtocol, "tcp", "localhost:7777")
s.Start()
// opentsdb / kairosdb
s := NewSubmitter(ms, OpenTSDBProtocol, "tcp", "localhost:7777")
s.Start()
// to tear down:
s.Shutdown()
}
```
See code for the Graphite/OpenTSDB protocols for adding your own output plugins, it's pretty simple.

View File

@ -0,0 +1,159 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Tyler Neely (t@jujit.su)
package loghisto
import (
"net"
"sync"
"time"
)
type requestable interface{}
type requestableArray interface {
ToRequest() []byte
}
// Submitter encapsulates the state of a metric submitter.
type Submitter struct {
// backlog works as an evicting queue
backlog [60][]byte
backlogHead uint
backlogTail uint
backlogMu sync.Mutex
serializer func(*ProcessedMetricSet) []byte
DestinationNetwork string
DestinationAddress string
metricSystem *MetricSystem
metricChan chan *ProcessedMetricSet
shutdownChan chan struct{}
}
// NewSubmitter creates a Submitter that receives metrics off of a
// specified metric channel, serializes them using the provided
// serialization function, and attempts to send them to the
// specified destination.
func NewSubmitter(metricSystem *MetricSystem,
serializer func(*ProcessedMetricSet) []byte, destinationNetwork string,
destinationAddress string) *Submitter {
metricChan := make(chan *ProcessedMetricSet, 60)
metricSystem.SubscribeToProcessedMetrics(metricChan)
return &Submitter{
backlog: [60][]byte{},
backlogHead: 0,
backlogTail: 0,
serializer: serializer,
DestinationNetwork: destinationNetwork,
DestinationAddress: destinationAddress,
metricSystem: metricSystem,
metricChan: metricChan,
shutdownChan: make(chan struct{}),
}
}
func (s *Submitter) retryBacklog() error {
var request []byte
for {
s.backlogMu.Lock()
head := s.backlogHead
tail := s.backlogTail
if head != tail {
request = s.backlog[head]
}
s.backlogMu.Unlock()
if head == tail {
return nil
}
err := s.submit(request)
if err != nil {
return err
}
s.backlogMu.Lock()
s.backlogHead = (s.backlogHead + 1) % 60
s.backlogMu.Unlock()
}
}
func (s *Submitter) appendToBacklog(request []byte) {
s.backlogMu.Lock()
s.backlog[s.backlogTail] = request
s.backlogTail = (s.backlogTail + 1) % 60
// if we've run into the head, evict it
if s.backlogHead == s.backlogTail {
s.backlogHead = (s.backlogHead + 1) % 60
}
s.backlogMu.Unlock()
}
func (s *Submitter) submit(request []byte) error {
conn, err := net.DialTimeout(s.DestinationNetwork, s.DestinationAddress,
5*time.Second)
if err != nil {
return err
}
conn.SetDeadline(time.Now().Add(5 * time.Second))
_, err = conn.Write(request)
conn.Close()
return err
}
// Start creates the goroutines that receive, serialize, and send metrics.
func (s *Submitter) Start() {
go func() {
for {
select {
case metrics, ok := <-s.metricChan:
if !ok {
// We can no longer make progress.
return
}
request := s.serializer(metrics)
s.appendToBacklog(request)
case <-s.shutdownChan:
return
}
}
}()
go func() {
for {
select {
case <-s.shutdownChan:
return
default:
s.retryBacklog()
tts := s.metricSystem.interval.Nanoseconds() -
(time.Now().UnixNano() % s.metricSystem.interval.Nanoseconds())
time.Sleep(time.Duration(tts))
}
}
}()
}
// Shutdown shuts down a submitter
func (s *Submitter) Shutdown() {
select {
case <-s.shutdownChan:
// already closed
default:
close(s.shutdownChan)
}
}