diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 34a26d2dc..9f254757b 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -10,6 +10,10 @@ "Comment": "null-5", "Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675" }, + { + "ImportPath": "github.com/akrennmair/gopcap", + "Rev": "00e11033259acb75598ba416495bb708d864a010" + }, { "ImportPath": "github.com/beorn7/perks/quantile", "Rev": "b965b613227fddccbfffe13eae360ed3fa822f8d" @@ -105,6 +109,10 @@ "ImportPath": "github.com/prometheus/procfs", "Rev": "454a56f35412459b5e684fd5ec0f9211b94f002a" }, + { + "ImportPath": "github.com/spacejam/loghisto", + "Rev": "323309774dec8b7430187e46cd0793974ccca04a" + }, { "ImportPath": "github.com/stretchr/testify/assert", "Rev": "9cc77fa25329013ce07362c7742952ff887361f2" diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/.gitignore b/Godeps/_workspace/src/github.com/akrennmair/gopcap/.gitignore new file mode 100644 index 000000000..99c61aa76 --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/.gitignore @@ -0,0 +1,5 @@ +#* +*~ +/tools/pass/pass +/tools/pcaptest/pcaptest +/tools/tcpdump/tcpdump diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/LICENSE b/Godeps/_workspace/src/github.com/akrennmair/gopcap/LICENSE new file mode 100644 index 000000000..385fac9f2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009-2011 Andreas Krennmair. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Andreas Krennmair nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/README.mkd b/Godeps/_workspace/src/github.com/akrennmair/gopcap/README.mkd new file mode 100644 index 000000000..f30ff9954 --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/README.mkd @@ -0,0 +1,11 @@ +# PCAP + +This is a simple wrapper around libpcap for Go. Originally written by Andreas +Krennmair and only minorly touched up by Mark Smith . + +Please see the included pcaptest.go and tcpdump.go programs for instructions on +how to use this library. + +Miek Gieben has created a more Go-like package and replaced functionality +with standard functions from the standard library. The package has also been renamed to +pcap. diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/decode.go b/Godeps/_workspace/src/github.com/akrennmair/gopcap/decode.go new file mode 100644 index 000000000..4db19c0a5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/decode.go @@ -0,0 +1,527 @@ +package pcap + +import ( + "encoding/binary" + "fmt" + "net" + "reflect" + "strings" +) + +const ( + TYPE_IP = 0x0800 + TYPE_ARP = 0x0806 + TYPE_IP6 = 0x86DD + TYPE_VLAN = 0x8100 + + IP_ICMP = 1 + IP_INIP = 4 + IP_TCP = 6 + IP_UDP = 17 +) + +const ( + ERRBUF_SIZE = 256 + + // According to pcap-linktype(7). + LINKTYPE_NULL = 0 + LINKTYPE_ETHERNET = 1 + LINKTYPE_TOKEN_RING = 6 + LINKTYPE_ARCNET = 7 + LINKTYPE_SLIP = 8 + LINKTYPE_PPP = 9 + LINKTYPE_FDDI = 10 + LINKTYPE_ATM_RFC1483 = 100 + LINKTYPE_RAW = 101 + LINKTYPE_PPP_HDLC = 50 + LINKTYPE_PPP_ETHER = 51 + LINKTYPE_C_HDLC = 104 + LINKTYPE_IEEE802_11 = 105 + LINKTYPE_FRELAY = 107 + LINKTYPE_LOOP = 108 + LINKTYPE_LINUX_SLL = 113 + LINKTYPE_LTALK = 104 + LINKTYPE_PFLOG = 117 + LINKTYPE_PRISM_HEADER = 119 + LINKTYPE_IP_OVER_FC = 122 + LINKTYPE_SUNATM = 123 + LINKTYPE_IEEE802_11_RADIO = 127 + LINKTYPE_ARCNET_LINUX = 129 + LINKTYPE_LINUX_IRDA = 144 + LINKTYPE_LINUX_LAPD = 177 +) + +type addrHdr interface { + SrcAddr() string + DestAddr() string + Len() int +} + +type addrStringer interface { + String(addr addrHdr) string +} + +func decodemac(pkt []byte) uint64 { + mac := uint64(0) + for i := uint(0); i < 6; i++ { + mac = (mac << 8) + uint64(pkt[i]) + } + return mac +} + +// Decode decodes the headers of a Packet. +func (p *Packet) Decode() { + if len(p.Data) <= 14 { + return + } + + p.Type = int(binary.BigEndian.Uint16(p.Data[12:14])) + p.DestMac = decodemac(p.Data[0:6]) + p.SrcMac = decodemac(p.Data[6:12]) + + if len(p.Data) >= 15 { + p.Payload = p.Data[14:] + } + + switch p.Type { + case TYPE_IP: + p.decodeIp() + case TYPE_IP6: + p.decodeIp6() + case TYPE_ARP: + p.decodeArp() + case TYPE_VLAN: + p.decodeVlan() + } +} + +func (p *Packet) headerString(headers []interface{}) string { + // If there's just one header, return that. + if len(headers) == 1 { + if hdr, ok := headers[0].(fmt.Stringer); ok { + return hdr.String() + } + } + // If there are two headers (IPv4/IPv6 -> TCP/UDP/IP..) + if len(headers) == 2 { + // Commonly the first header is an address. + if addr, ok := p.Headers[0].(addrHdr); ok { + if hdr, ok := p.Headers[1].(addrStringer); ok { + return fmt.Sprintf("%s %s", p.Time, hdr.String(addr)) + } + } + } + // For IP in IP, we do a recursive call. + if len(headers) >= 2 { + if addr, ok := headers[0].(addrHdr); ok { + if _, ok := headers[1].(addrHdr); ok { + return fmt.Sprintf("%s > %s IP in IP: ", + addr.SrcAddr(), addr.DestAddr(), p.headerString(headers[1:])) + } + } + } + + var typeNames []string + for _, hdr := range headers { + typeNames = append(typeNames, reflect.TypeOf(hdr).String()) + } + + return fmt.Sprintf("unknown [%s]", strings.Join(typeNames, ",")) +} + +// String prints a one-line representation of the packet header. +// The output is suitable for use in a tcpdump program. +func (p *Packet) String() string { + // If there are no headers, print "unsupported protocol". + if len(p.Headers) == 0 { + return fmt.Sprintf("%s unsupported protocol %d", p.Time, int(p.Type)) + } + return fmt.Sprintf("%s %s", p.Time, p.headerString(p.Headers)) +} + +// Arphdr is a ARP packet header. +type Arphdr struct { + Addrtype uint16 + Protocol uint16 + HwAddressSize uint8 + ProtAddressSize uint8 + Operation uint16 + SourceHwAddress []byte + SourceProtAddress []byte + DestHwAddress []byte + DestProtAddress []byte +} + +func (arp *Arphdr) String() (s string) { + switch arp.Operation { + case 1: + s = "ARP request" + case 2: + s = "ARP Reply" + } + if arp.Addrtype == LINKTYPE_ETHERNET && arp.Protocol == TYPE_IP { + s = fmt.Sprintf("%012x (%s) > %012x (%s)", + decodemac(arp.SourceHwAddress), arp.SourceProtAddress, + decodemac(arp.DestHwAddress), arp.DestProtAddress) + } else { + s = fmt.Sprintf("addrtype = %d protocol = %d", arp.Addrtype, arp.Protocol) + } + return +} + +func (p *Packet) decodeArp() { + if len(p.Payload) < 8 { + return + } + + pkt := p.Payload + arp := new(Arphdr) + arp.Addrtype = binary.BigEndian.Uint16(pkt[0:2]) + arp.Protocol = binary.BigEndian.Uint16(pkt[2:4]) + arp.HwAddressSize = pkt[4] + arp.ProtAddressSize = pkt[5] + arp.Operation = binary.BigEndian.Uint16(pkt[6:8]) + + if len(pkt) < int(8+2*arp.HwAddressSize+2*arp.ProtAddressSize) { + return + } + arp.SourceHwAddress = pkt[8 : 8+arp.HwAddressSize] + arp.SourceProtAddress = pkt[8+arp.HwAddressSize : 8+arp.HwAddressSize+arp.ProtAddressSize] + arp.DestHwAddress = pkt[8+arp.HwAddressSize+arp.ProtAddressSize : 8+2*arp.HwAddressSize+arp.ProtAddressSize] + arp.DestProtAddress = pkt[8+2*arp.HwAddressSize+arp.ProtAddressSize : 8+2*arp.HwAddressSize+2*arp.ProtAddressSize] + + p.Headers = append(p.Headers, arp) + + if len(pkt) >= int(8+2*arp.HwAddressSize+2*arp.ProtAddressSize) { + p.Payload = p.Payload[8+2*arp.HwAddressSize+2*arp.ProtAddressSize:] + } +} + +// IPadr is the header of an IP packet. +type Iphdr struct { + Version uint8 + Ihl uint8 + Tos uint8 + Length uint16 + Id uint16 + Flags uint8 + FragOffset uint16 + Ttl uint8 + Protocol uint8 + Checksum uint16 + SrcIp []byte + DestIp []byte +} + +func (p *Packet) decodeIp() { + if len(p.Payload) < 20 { + return + } + + pkt := p.Payload + ip := new(Iphdr) + + ip.Version = uint8(pkt[0]) >> 4 + ip.Ihl = uint8(pkt[0]) & 0x0F + ip.Tos = pkt[1] + ip.Length = binary.BigEndian.Uint16(pkt[2:4]) + ip.Id = binary.BigEndian.Uint16(pkt[4:6]) + flagsfrags := binary.BigEndian.Uint16(pkt[6:8]) + ip.Flags = uint8(flagsfrags >> 13) + ip.FragOffset = flagsfrags & 0x1FFF + ip.Ttl = pkt[8] + ip.Protocol = pkt[9] + ip.Checksum = binary.BigEndian.Uint16(pkt[10:12]) + ip.SrcIp = pkt[12:16] + ip.DestIp = pkt[16:20] + + pEnd := int(ip.Length) + if pEnd > len(pkt) { + pEnd = len(pkt) + } + + if len(pkt) >= pEnd && int(ip.Ihl*4) < pEnd { + p.Payload = pkt[ip.Ihl*4 : pEnd] + } else { + p.Payload = []byte{} + } + + p.Headers = append(p.Headers, ip) + p.IP = ip + + switch ip.Protocol { + case IP_TCP: + p.decodeTcp() + case IP_UDP: + p.decodeUdp() + case IP_ICMP: + p.decodeIcmp() + case IP_INIP: + p.decodeIp() + } +} + +func (ip *Iphdr) SrcAddr() string { return net.IP(ip.SrcIp).String() } +func (ip *Iphdr) DestAddr() string { return net.IP(ip.DestIp).String() } +func (ip *Iphdr) Len() int { return int(ip.Length) } + +type Vlanhdr struct { + Priority byte + DropEligible bool + VlanIdentifier int + Type int // Not actually part of the vlan header, but the type of the actual packet +} + +func (v *Vlanhdr) String() { + fmt.Sprintf("VLAN Priority:%d Drop:%v Tag:%d", v.Priority, v.DropEligible, v.VlanIdentifier) +} + +func (p *Packet) decodeVlan() { + pkt := p.Payload + vlan := new(Vlanhdr) + if len(pkt) < 4 { + return + } + + vlan.Priority = (pkt[2] & 0xE0) >> 13 + vlan.DropEligible = pkt[2]&0x10 != 0 + vlan.VlanIdentifier = int(binary.BigEndian.Uint16(pkt[:2])) & 0x0FFF + vlan.Type = int(binary.BigEndian.Uint16(p.Payload[2:4])) + p.Headers = append(p.Headers, vlan) + + if len(pkt) >= 5 { + p.Payload = p.Payload[4:] + } + + switch vlan.Type { + case TYPE_IP: + p.decodeIp() + case TYPE_IP6: + p.decodeIp6() + case TYPE_ARP: + p.decodeArp() + } +} + +type Tcphdr struct { + SrcPort uint16 + DestPort uint16 + Seq uint32 + Ack uint32 + DataOffset uint8 + Flags uint16 + Window uint16 + Checksum uint16 + Urgent uint16 + Data []byte +} + +const ( + TCP_FIN = 1 << iota + TCP_SYN + TCP_RST + TCP_PSH + TCP_ACK + TCP_URG + TCP_ECE + TCP_CWR + TCP_NS +) + +func (p *Packet) decodeTcp() { + if len(p.Payload) < 20 { + return + } + + pkt := p.Payload + tcp := new(Tcphdr) + tcp.SrcPort = binary.BigEndian.Uint16(pkt[0:2]) + tcp.DestPort = binary.BigEndian.Uint16(pkt[2:4]) + tcp.Seq = binary.BigEndian.Uint32(pkt[4:8]) + tcp.Ack = binary.BigEndian.Uint32(pkt[8:12]) + tcp.DataOffset = (pkt[12] & 0xF0) >> 4 + tcp.Flags = binary.BigEndian.Uint16(pkt[12:14]) & 0x1FF + tcp.Window = binary.BigEndian.Uint16(pkt[14:16]) + tcp.Checksum = binary.BigEndian.Uint16(pkt[16:18]) + tcp.Urgent = binary.BigEndian.Uint16(pkt[18:20]) + if len(pkt) >= int(tcp.DataOffset*4) { + p.Payload = pkt[tcp.DataOffset*4:] + } + p.Headers = append(p.Headers, tcp) + p.TCP = tcp +} + +func (tcp *Tcphdr) String(hdr addrHdr) string { + return fmt.Sprintf("TCP %s:%d > %s:%d %s SEQ=%d ACK=%d LEN=%d", + hdr.SrcAddr(), int(tcp.SrcPort), hdr.DestAddr(), int(tcp.DestPort), + tcp.FlagsString(), int64(tcp.Seq), int64(tcp.Ack), hdr.Len()) +} + +func (tcp *Tcphdr) FlagsString() string { + var sflags []string + if 0 != (tcp.Flags & TCP_SYN) { + sflags = append(sflags, "syn") + } + if 0 != (tcp.Flags & TCP_FIN) { + sflags = append(sflags, "fin") + } + if 0 != (tcp.Flags & TCP_ACK) { + sflags = append(sflags, "ack") + } + if 0 != (tcp.Flags & TCP_PSH) { + sflags = append(sflags, "psh") + } + if 0 != (tcp.Flags & TCP_RST) { + sflags = append(sflags, "rst") + } + if 0 != (tcp.Flags & TCP_URG) { + sflags = append(sflags, "urg") + } + if 0 != (tcp.Flags & TCP_NS) { + sflags = append(sflags, "ns") + } + if 0 != (tcp.Flags & TCP_CWR) { + sflags = append(sflags, "cwr") + } + if 0 != (tcp.Flags & TCP_ECE) { + sflags = append(sflags, "ece") + } + return fmt.Sprintf("[%s]", strings.Join(sflags, " ")) +} + +type Udphdr struct { + SrcPort uint16 + DestPort uint16 + Length uint16 + Checksum uint16 +} + +func (p *Packet) decodeUdp() { + if len(p.Payload) < 8 { + return + } + + pkt := p.Payload + udp := new(Udphdr) + udp.SrcPort = binary.BigEndian.Uint16(pkt[0:2]) + udp.DestPort = binary.BigEndian.Uint16(pkt[2:4]) + udp.Length = binary.BigEndian.Uint16(pkt[4:6]) + udp.Checksum = binary.BigEndian.Uint16(pkt[6:8]) + p.Headers = append(p.Headers, udp) + p.UDP = udp + if len(p.Payload) >= 8 { + p.Payload = pkt[8:] + } +} + +func (udp *Udphdr) String(hdr addrHdr) string { + return fmt.Sprintf("UDP %s:%d > %s:%d LEN=%d CHKSUM=%d", + hdr.SrcAddr(), int(udp.SrcPort), hdr.DestAddr(), int(udp.DestPort), + int(udp.Length), int(udp.Checksum)) +} + +type Icmphdr struct { + Type uint8 + Code uint8 + Checksum uint16 + Id uint16 + Seq uint16 + Data []byte +} + +func (p *Packet) decodeIcmp() *Icmphdr { + if len(p.Payload) < 8 { + return nil + } + + pkt := p.Payload + icmp := new(Icmphdr) + icmp.Type = pkt[0] + icmp.Code = pkt[1] + icmp.Checksum = binary.BigEndian.Uint16(pkt[2:4]) + icmp.Id = binary.BigEndian.Uint16(pkt[4:6]) + icmp.Seq = binary.BigEndian.Uint16(pkt[6:8]) + p.Payload = pkt[8:] + p.Headers = append(p.Headers, icmp) + return icmp +} + +func (icmp *Icmphdr) String(hdr addrHdr) string { + return fmt.Sprintf("ICMP %s > %s Type = %d Code = %d ", + hdr.SrcAddr(), hdr.DestAddr(), icmp.Type, icmp.Code) +} + +func (icmp *Icmphdr) TypeString() (result string) { + switch icmp.Type { + case 0: + result = fmt.Sprintf("Echo reply seq=%d", icmp.Seq) + case 3: + switch icmp.Code { + case 0: + result = "Network unreachable" + case 1: + result = "Host unreachable" + case 2: + result = "Protocol unreachable" + case 3: + result = "Port unreachable" + default: + result = "Destination unreachable" + } + case 8: + result = fmt.Sprintf("Echo request seq=%d", icmp.Seq) + case 30: + result = "Traceroute" + } + return +} + +type Ip6hdr struct { + // http://www.networksorcery.com/enp/protocol/ipv6.htm + Version uint8 // 4 bits + TrafficClass uint8 // 8 bits + FlowLabel uint32 // 20 bits + Length uint16 // 16 bits + NextHeader uint8 // 8 bits, same as Protocol in Iphdr + HopLimit uint8 // 8 bits + SrcIp []byte // 16 bytes + DestIp []byte // 16 bytes +} + +func (p *Packet) decodeIp6() { + if len(p.Payload) < 40 { + return + } + + pkt := p.Payload + ip6 := new(Ip6hdr) + ip6.Version = uint8(pkt[0]) >> 4 + ip6.TrafficClass = uint8((binary.BigEndian.Uint16(pkt[0:2]) >> 4) & 0x00FF) + ip6.FlowLabel = binary.BigEndian.Uint32(pkt[0:4]) & 0x000FFFFF + ip6.Length = binary.BigEndian.Uint16(pkt[4:6]) + ip6.NextHeader = pkt[6] + ip6.HopLimit = pkt[7] + ip6.SrcIp = pkt[8:24] + ip6.DestIp = pkt[24:40] + + if len(p.Payload) >= 40 { + p.Payload = pkt[40:] + } + + p.Headers = append(p.Headers, ip6) + + switch ip6.NextHeader { + case IP_TCP: + p.decodeTcp() + case IP_UDP: + p.decodeUdp() + case IP_ICMP: + p.decodeIcmp() + case IP_INIP: + p.decodeIp() + } +} + +func (ip6 *Ip6hdr) SrcAddr() string { return net.IP(ip6.SrcIp).String() } +func (ip6 *Ip6hdr) DestAddr() string { return net.IP(ip6.DestIp).String() } +func (ip6 *Ip6hdr) Len() int { return int(ip6.Length) } diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/decode_test.go b/Godeps/_workspace/src/github.com/akrennmair/gopcap/decode_test.go new file mode 100644 index 000000000..7328c4f12 --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/decode_test.go @@ -0,0 +1,247 @@ +package pcap + +import ( + "bytes" + "testing" + "time" +) + +var testSimpleTcpPacket *Packet = &Packet{ + Data: []byte{ + 0x00, 0x00, 0x0c, 0x9f, 0xf0, 0x20, 0xbc, 0x30, 0x5b, 0xe8, 0xd3, 0x49, + 0x08, 0x00, 0x45, 0x00, 0x01, 0xa4, 0x39, 0xdf, 0x40, 0x00, 0x40, 0x06, + 0x55, 0x5a, 0xac, 0x11, 0x51, 0x49, 0xad, 0xde, 0xfe, 0xe1, 0xc5, 0xf7, + 0x00, 0x50, 0xc5, 0x7e, 0x0e, 0x48, 0x49, 0x07, 0x42, 0x32, 0x80, 0x18, + 0x00, 0x73, 0xab, 0xb1, 0x00, 0x00, 0x01, 0x01, 0x08, 0x0a, 0x03, 0x77, + 0x37, 0x9c, 0x42, 0x77, 0x5e, 0x3a, 0x47, 0x45, 0x54, 0x20, 0x2f, 0x20, + 0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, 0x2e, 0x31, 0x0d, 0x0a, 0x48, 0x6f, + 0x73, 0x74, 0x3a, 0x20, 0x77, 0x77, 0x77, 0x2e, 0x66, 0x69, 0x73, 0x68, + 0x2e, 0x63, 0x6f, 0x6d, 0x0d, 0x0a, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x3a, 0x20, 0x6b, 0x65, 0x65, 0x70, 0x2d, 0x61, + 0x6c, 0x69, 0x76, 0x65, 0x0d, 0x0a, 0x55, 0x73, 0x65, 0x72, 0x2d, 0x41, + 0x67, 0x65, 0x6e, 0x74, 0x3a, 0x20, 0x4d, 0x6f, 0x7a, 0x69, 0x6c, 0x6c, + 0x61, 0x2f, 0x35, 0x2e, 0x30, 0x20, 0x28, 0x58, 0x31, 0x31, 0x3b, 0x20, + 0x4c, 0x69, 0x6e, 0x75, 0x78, 0x20, 0x78, 0x38, 0x36, 0x5f, 0x36, 0x34, + 0x29, 0x20, 0x41, 0x70, 0x70, 0x6c, 0x65, 0x57, 0x65, 0x62, 0x4b, 0x69, + 0x74, 0x2f, 0x35, 0x33, 0x35, 0x2e, 0x32, 0x20, 0x28, 0x4b, 0x48, 0x54, + 0x4d, 0x4c, 0x2c, 0x20, 0x6c, 0x69, 0x6b, 0x65, 0x20, 0x47, 0x65, 0x63, + 0x6b, 0x6f, 0x29, 0x20, 0x43, 0x68, 0x72, 0x6f, 0x6d, 0x65, 0x2f, 0x31, + 0x35, 0x2e, 0x30, 0x2e, 0x38, 0x37, 0x34, 0x2e, 0x31, 0x32, 0x31, 0x20, + 0x53, 0x61, 0x66, 0x61, 0x72, 0x69, 0x2f, 0x35, 0x33, 0x35, 0x2e, 0x32, + 0x0d, 0x0a, 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x3a, 0x20, 0x74, 0x65, + 0x78, 0x74, 0x2f, 0x68, 0x74, 0x6d, 0x6c, 0x2c, 0x61, 0x70, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x78, 0x68, 0x74, 0x6d, + 0x6c, 0x2b, 0x78, 0x6d, 0x6c, 0x2c, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x78, 0x6d, 0x6c, 0x3b, 0x71, 0x3d, + 0x30, 0x2e, 0x39, 0x2c, 0x2a, 0x2f, 0x2a, 0x3b, 0x71, 0x3d, 0x30, 0x2e, + 0x38, 0x0d, 0x0a, 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x2d, 0x45, 0x6e, + 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x3a, 0x20, 0x67, 0x7a, 0x69, 0x70, + 0x2c, 0x64, 0x65, 0x66, 0x6c, 0x61, 0x74, 0x65, 0x2c, 0x73, 0x64, 0x63, + 0x68, 0x0d, 0x0a, 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x2d, 0x4c, 0x61, + 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x3a, 0x20, 0x65, 0x6e, 0x2d, 0x55, + 0x53, 0x2c, 0x65, 0x6e, 0x3b, 0x71, 0x3d, 0x30, 0x2e, 0x38, 0x0d, 0x0a, + 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x2d, 0x43, 0x68, 0x61, 0x72, 0x73, + 0x65, 0x74, 0x3a, 0x20, 0x49, 0x53, 0x4f, 0x2d, 0x38, 0x38, 0x35, 0x39, + 0x2d, 0x31, 0x2c, 0x75, 0x74, 0x66, 0x2d, 0x38, 0x3b, 0x71, 0x3d, 0x30, + 0x2e, 0x37, 0x2c, 0x2a, 0x3b, 0x71, 0x3d, 0x30, 0x2e, 0x33, 0x0d, 0x0a, + 0x0d, 0x0a, + }} + +func BenchmarkDecodeSimpleTcpPacket(b *testing.B) { + for i := 0; i < b.N; i++ { + testSimpleTcpPacket.Decode() + } +} + +func TestDecodeSimpleTcpPacket(t *testing.T) { + p := testSimpleTcpPacket + p.Decode() + if p.DestMac != 0x00000c9ff020 { + t.Error("Dest mac", p.DestMac) + } + if p.SrcMac != 0xbc305be8d349 { + t.Error("Src mac", p.SrcMac) + } + if len(p.Headers) != 2 { + t.Error("Incorrect number of headers", len(p.Headers)) + return + } + if ip, ipOk := p.Headers[0].(*Iphdr); ipOk { + if ip.Version != 4 { + t.Error("ip Version", ip.Version) + } + if ip.Ihl != 5 { + t.Error("ip header length", ip.Ihl) + } + if ip.Tos != 0 { + t.Error("ip TOS", ip.Tos) + } + if ip.Length != 420 { + t.Error("ip Length", ip.Length) + } + if ip.Id != 14815 { + t.Error("ip ID", ip.Id) + } + if ip.Flags != 0x02 { + t.Error("ip Flags", ip.Flags) + } + if ip.FragOffset != 0 { + t.Error("ip Fragoffset", ip.FragOffset) + } + if ip.Ttl != 64 { + t.Error("ip TTL", ip.Ttl) + } + if ip.Protocol != 6 { + t.Error("ip Protocol", ip.Protocol) + } + if ip.Checksum != 0x555A { + t.Error("ip Checksum", ip.Checksum) + } + if !bytes.Equal(ip.SrcIp, []byte{172, 17, 81, 73}) { + t.Error("ip Src", ip.SrcIp) + } + if !bytes.Equal(ip.DestIp, []byte{173, 222, 254, 225}) { + t.Error("ip Dest", ip.DestIp) + } + if tcp, tcpOk := p.Headers[1].(*Tcphdr); tcpOk { + if tcp.SrcPort != 50679 { + t.Error("tcp srcport", tcp.SrcPort) + } + if tcp.DestPort != 80 { + t.Error("tcp destport", tcp.DestPort) + } + if tcp.Seq != 0xc57e0e48 { + t.Error("tcp seq", tcp.Seq) + } + if tcp.Ack != 0x49074232 { + t.Error("tcp ack", tcp.Ack) + } + if tcp.DataOffset != 8 { + t.Error("tcp dataoffset", tcp.DataOffset) + } + if tcp.Flags != 0x18 { + t.Error("tcp flags", tcp.Flags) + } + if tcp.Window != 0x73 { + t.Error("tcp window", tcp.Window) + } + if tcp.Checksum != 0xabb1 { + t.Error("tcp checksum", tcp.Checksum) + } + if tcp.Urgent != 0 { + t.Error("tcp urgent", tcp.Urgent) + } + } else { + t.Error("Second header is not TCP header") + } + } else { + t.Error("First header is not IP header") + } + if string(p.Payload) != "GET / HTTP/1.1\r\nHost: www.fish.com\r\nConnection: keep-alive\r\nUser-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.2 (KHTML, like Gecko) Chrome/15.0.874.121 Safari/535.2\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\n\r\n" { + t.Error("--- PAYLOAD STRING ---\n", string(p.Payload), "\n--- PAYLOAD BYTES ---\n", p.Payload) + } +} + +// Makes sure packet payload doesn't display the 6 trailing null of this packet +// as part of the payload. They're actually the ethernet trailer. +func TestDecodeSmallTcpPacketHasEmptyPayload(t *testing.T) { + p := &Packet{ + // This packet is only 54 bits (an empty TCP RST), thus 6 trailing null + // bytes are added by the ethernet layer to make it the minimum packet size. + Data: []byte{ + 0xbc, 0x30, 0x5b, 0xe8, 0xd3, 0x49, 0xb8, 0xac, 0x6f, 0x92, 0xd5, 0xbf, + 0x08, 0x00, 0x45, 0x00, 0x00, 0x28, 0x00, 0x00, 0x40, 0x00, 0x40, 0x06, + 0x3f, 0x9f, 0xac, 0x11, 0x51, 0xc5, 0xac, 0x11, 0x51, 0x49, 0x00, 0x63, + 0x9a, 0xef, 0x00, 0x00, 0x00, 0x00, 0x2e, 0xc1, 0x27, 0x83, 0x50, 0x14, + 0x00, 0x00, 0xc3, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }} + p.Decode() + if p.Payload == nil { + t.Error("Nil payload") + } + if len(p.Payload) != 0 { + t.Error("Non-empty payload:", p.Payload) + } +} + +func TestDecodeVlanPacket(t *testing.T) { + p := &Packet{ + Data: []byte{ + 0x00, 0x10, 0xdb, 0xff, 0x10, 0x00, 0x00, 0x15, 0x2c, 0x9d, 0xcc, 0x00, 0x81, 0x00, 0x01, 0xf7, + 0x08, 0x00, 0x45, 0x00, 0x00, 0x28, 0x29, 0x8d, 0x40, 0x00, 0x7d, 0x06, 0x83, 0xa0, 0xac, 0x1b, + 0xca, 0x8e, 0x45, 0x16, 0x94, 0xe2, 0xd4, 0x0a, 0x00, 0x50, 0xdf, 0xab, 0x9c, 0xc6, 0xcd, 0x1e, + 0xe5, 0xd1, 0x50, 0x10, 0x01, 0x00, 0x5a, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }} + p.Decode() + if p.Type != TYPE_VLAN { + t.Error("Didn't detect vlan") + } + if len(p.Headers) != 3 { + t.Error("Incorrect number of headers:", len(p.Headers)) + for i, h := range p.Headers { + t.Errorf("Header %d: %#v", i, h) + } + t.FailNow() + } + if _, ok := p.Headers[0].(*Vlanhdr); !ok { + t.Errorf("First header isn't vlan: %q", p.Headers[0]) + } + if _, ok := p.Headers[1].(*Iphdr); !ok { + t.Errorf("Second header isn't IP: %q", p.Headers[1]) + } + if _, ok := p.Headers[2].(*Tcphdr); !ok { + t.Errorf("Third header isn't TCP: %q", p.Headers[2]) + } +} + +func TestDecodeFuzzFallout(t *testing.T) { + testData := []struct { + Data []byte + }{ + {[]byte("000000000000\x81\x000")}, + {[]byte("000000000000\x81\x00000")}, + {[]byte("000000000000\x86\xdd0")}, + {[]byte("000000000000\b\x000")}, + {[]byte("000000000000\b\x060")}, + {[]byte{}}, + {[]byte("000000000000\b\x0600000000")}, + {[]byte("000000000000\x86\xdd000000\x01000000000000000000000000000000000")}, + {[]byte("000000000000\x81\x0000\b\x0600000000")}, + {[]byte("000000000000\b\x00n0000000000000000000")}, + {[]byte("000000000000\x86\xdd000000\x0100000000000000000000000000000000000")}, + {[]byte("000000000000\x81\x0000\b\x00g0000000000000000000")}, + //{[]byte()}, + {[]byte("000000000000\b\x00400000000\x110000000000")}, + {[]byte("0nMØ¡\xfe\x13\x13\x81\x00gr\b\x00&x\xc9\xe5b'\x1e0\x00\x04\x00\x0020596224")}, + {[]byte("000000000000\x81\x0000\b\x00400000000\x110000000000")}, + {[]byte("000000000000\b\x00000000000\x0600\xff0000000")}, + {[]byte("000000000000\x86\xdd000000\x06000000000000000000000000000000000")}, + {[]byte("000000000000\x81\x0000\b\x00000000000\x0600b0000000")}, + {[]byte("000000000000\x81\x0000\b\x00400000000\x060000000000")}, + {[]byte("000000000000\x86\xdd000000\x11000000000000000000000000000000000")}, + {[]byte("000000000000\x86\xdd000000\x0600000000000000000000000000000000000000000000M")}, + {[]byte("000000000000\b\x00500000000\x0600000000000")}, + {[]byte("0nM\xd80\xfe\x13\x13\x81\x00gr\b\x00&x\xc9\xe5b'\x1e0\x00\x04\x00\x0020596224")}, + } + + for _, entry := range testData { + pkt := &Packet{ + Time: time.Now(), + Caplen: uint32(len(entry.Data)), + Len: uint32(len(entry.Data)), + Data: entry.Data, + } + + pkt.Decode() + /* + func() { + defer func() { + if err := recover(); err != nil { + t.Fatalf("%d. %q failed: %v", idx, string(entry.Data), err) + } + }() + pkt.Decode() + }() + */ + } +} diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/io.go b/Godeps/_workspace/src/github.com/akrennmair/gopcap/io.go new file mode 100644 index 000000000..b00bee993 --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/io.go @@ -0,0 +1,206 @@ +package pcap + +import ( + "encoding/binary" + "fmt" + "io" + "time" +) + +// FileHeader is the parsed header of a pcap file. +// http://wiki.wireshark.org/Development/LibpcapFileFormat +type FileHeader struct { + MagicNumber uint32 + VersionMajor uint16 + VersionMinor uint16 + TimeZone int32 + SigFigs uint32 + SnapLen uint32 + Network uint32 +} + +type PacketTime struct { + Sec int32 + Usec int32 +} + +// Convert the PacketTime to a go Time struct. +func (p *PacketTime) Time() time.Time { + return time.Unix(int64(p.Sec), int64(p.Usec)*1000) +} + +// Packet is a single packet parsed from a pcap file. +// +// Convenient access to IP, TCP, and UDP headers is provided after Decode() +// is called if the packet is of the appropriate type. +type Packet struct { + Time time.Time // packet send/receive time + Caplen uint32 // bytes stored in the file (caplen <= len) + Len uint32 // bytes sent/received + Data []byte // packet data + + Type int // protocol type, see LINKTYPE_* + DestMac uint64 + SrcMac uint64 + + Headers []interface{} // decoded headers, in order + Payload []byte // remaining non-header bytes + + IP *Iphdr // IP header (for IP packets, after decoding) + TCP *Tcphdr // TCP header (for TCP packets, after decoding) + UDP *Udphdr // UDP header (for UDP packets after decoding) +} + +// Reader parses pcap files. +type Reader struct { + flip bool + buf io.Reader + err error + fourBytes []byte + twoBytes []byte + sixteenBytes []byte + Header FileHeader +} + +// NewReader reads pcap data from an io.Reader. +func NewReader(reader io.Reader) (*Reader, error) { + r := &Reader{ + buf: reader, + fourBytes: make([]byte, 4), + twoBytes: make([]byte, 2), + sixteenBytes: make([]byte, 16), + } + switch magic := r.readUint32(); magic { + case 0xa1b2c3d4: + r.flip = false + case 0xd4c3b2a1: + r.flip = true + default: + return nil, fmt.Errorf("pcap: bad magic number: %0x", magic) + } + r.Header = FileHeader{ + MagicNumber: 0xa1b2c3d4, + VersionMajor: r.readUint16(), + VersionMinor: r.readUint16(), + TimeZone: r.readInt32(), + SigFigs: r.readUint32(), + SnapLen: r.readUint32(), + Network: r.readUint32(), + } + return r, nil +} + +// Next returns the next packet or nil if no more packets can be read. +func (r *Reader) Next() *Packet { + d := r.sixteenBytes + r.err = r.read(d) + if r.err != nil { + return nil + } + timeSec := asUint32(d[0:4], r.flip) + timeUsec := asUint32(d[4:8], r.flip) + capLen := asUint32(d[8:12], r.flip) + origLen := asUint32(d[12:16], r.flip) + + data := make([]byte, capLen) + if r.err = r.read(data); r.err != nil { + return nil + } + return &Packet{ + Time: time.Unix(int64(timeSec), int64(timeUsec)), + Caplen: capLen, + Len: origLen, + Data: data, + } +} + +func (r *Reader) read(data []byte) error { + var err error + n, err := r.buf.Read(data) + for err == nil && n != len(data) { + var chunk int + chunk, err = r.buf.Read(data[n:]) + n += chunk + } + if len(data) == n { + return nil + } + return err +} + +func (r *Reader) readUint32() uint32 { + data := r.fourBytes + if r.err = r.read(data); r.err != nil { + return 0 + } + return asUint32(data, r.flip) +} + +func (r *Reader) readInt32() int32 { + data := r.fourBytes + if r.err = r.read(data); r.err != nil { + return 0 + } + return int32(asUint32(data, r.flip)) +} + +func (r *Reader) readUint16() uint16 { + data := r.twoBytes + if r.err = r.read(data); r.err != nil { + return 0 + } + return asUint16(data, r.flip) +} + +// Writer writes a pcap file. +type Writer struct { + writer io.Writer + buf []byte +} + +// NewWriter creates a Writer that stores output in an io.Writer. +// The FileHeader is written immediately. +func NewWriter(writer io.Writer, header *FileHeader) (*Writer, error) { + w := &Writer{ + writer: writer, + buf: make([]byte, 24), + } + binary.LittleEndian.PutUint32(w.buf, header.MagicNumber) + binary.LittleEndian.PutUint16(w.buf[4:], header.VersionMajor) + binary.LittleEndian.PutUint16(w.buf[6:], header.VersionMinor) + binary.LittleEndian.PutUint32(w.buf[8:], uint32(header.TimeZone)) + binary.LittleEndian.PutUint32(w.buf[12:], header.SigFigs) + binary.LittleEndian.PutUint32(w.buf[16:], header.SnapLen) + binary.LittleEndian.PutUint32(w.buf[20:], header.Network) + if _, err := writer.Write(w.buf); err != nil { + return nil, err + } + return w, nil +} + +// Writer writes a packet to the underlying writer. +func (w *Writer) Write(pkt *Packet) error { + binary.LittleEndian.PutUint32(w.buf, uint32(pkt.Time.Unix())) + binary.LittleEndian.PutUint32(w.buf[4:], uint32(pkt.Time.Nanosecond())) + binary.LittleEndian.PutUint32(w.buf[8:], uint32(pkt.Time.Unix())) + binary.LittleEndian.PutUint32(w.buf[12:], pkt.Len) + if _, err := w.writer.Write(w.buf[:16]); err != nil { + return err + } + _, err := w.writer.Write(pkt.Data) + return err +} + +func asUint32(data []byte, flip bool) uint32 { + if flip { + return binary.BigEndian.Uint32(data) + } + return binary.LittleEndian.Uint32(data) +} + +func asUint16(data []byte, flip bool) uint16 { + if flip { + return binary.BigEndian.Uint16(data) + } + return binary.LittleEndian.Uint16(data) +} diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/pcap.go b/Godeps/_workspace/src/github.com/akrennmair/gopcap/pcap.go new file mode 100644 index 000000000..1b393c552 --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/pcap.go @@ -0,0 +1,266 @@ +// Interface to both live and offline pcap parsing. +package pcap + +/* +#cgo linux LDFLAGS: -lpcap +#cgo freebsd LDFLAGS: -lpcap +#cgo darwin LDFLAGS: -lpcap +#cgo windows CFLAGS: -I C:/WpdPack/Include +#cgo windows,386 LDFLAGS: -L C:/WpdPack/Lib -lwpcap +#cgo windows,amd64 LDFLAGS: -L C:/WpdPack/Lib/x64 -lwpcap +#include +#include + +// Workaround for not knowing how to cast to const u_char** +int hack_pcap_next_ex(pcap_t *p, struct pcap_pkthdr **pkt_header, + u_char **pkt_data) { + return pcap_next_ex(p, pkt_header, (const u_char **)pkt_data); +} +*/ +import "C" +import ( + "errors" + "net" + "syscall" + "time" + "unsafe" +) + +type Pcap struct { + cptr *C.pcap_t +} + +type Stat struct { + PacketsReceived uint32 + PacketsDropped uint32 + PacketsIfDropped uint32 +} + +type Interface struct { + Name string + Description string + Addresses []IFAddress + // TODO: add more elements +} + +type IFAddress struct { + IP net.IP + Netmask net.IPMask + // TODO: add broadcast + PtP dst ? +} + +func (p *Pcap) Next() (pkt *Packet) { + rv, _ := p.NextEx() + return rv +} + +// Openlive opens a device and returns a *Pcap handler +func Openlive(device string, snaplen int32, promisc bool, timeout_ms int32) (handle *Pcap, err error) { + var buf *C.char + buf = (*C.char)(C.calloc(ERRBUF_SIZE, 1)) + h := new(Pcap) + var pro int32 + if promisc { + pro = 1 + } + + dev := C.CString(device) + defer C.free(unsafe.Pointer(dev)) + + h.cptr = C.pcap_open_live(dev, C.int(snaplen), C.int(pro), C.int(timeout_ms), buf) + if nil == h.cptr { + handle = nil + err = errors.New(C.GoString(buf)) + } else { + handle = h + } + C.free(unsafe.Pointer(buf)) + return +} + +func Openoffline(file string) (handle *Pcap, err error) { + var buf *C.char + buf = (*C.char)(C.calloc(ERRBUF_SIZE, 1)) + h := new(Pcap) + + cf := C.CString(file) + defer C.free(unsafe.Pointer(cf)) + + h.cptr = C.pcap_open_offline(cf, buf) + if nil == h.cptr { + handle = nil + err = errors.New(C.GoString(buf)) + } else { + handle = h + } + C.free(unsafe.Pointer(buf)) + return +} + +func (p *Pcap) NextEx() (pkt *Packet, result int32) { + var pkthdr *C.struct_pcap_pkthdr + + var buf_ptr *C.u_char + var buf unsafe.Pointer + result = int32(C.hack_pcap_next_ex(p.cptr, &pkthdr, &buf_ptr)) + + buf = unsafe.Pointer(buf_ptr) + if nil == buf { + return + } + + pkt = new(Packet) + pkt.Time = time.Unix(int64(pkthdr.ts.tv_sec), int64(pkthdr.ts.tv_usec)*1000) + pkt.Caplen = uint32(pkthdr.caplen) + pkt.Len = uint32(pkthdr.len) + pkt.Data = C.GoBytes(buf, C.int(pkthdr.caplen)) + return +} + +func (p *Pcap) Close() { + C.pcap_close(p.cptr) +} + +func (p *Pcap) Geterror() error { + return errors.New(C.GoString(C.pcap_geterr(p.cptr))) +} + +func (p *Pcap) Getstats() (stat *Stat, err error) { + var cstats _Ctype_struct_pcap_stat + if -1 == C.pcap_stats(p.cptr, &cstats) { + return nil, p.Geterror() + } + stats := new(Stat) + stats.PacketsReceived = uint32(cstats.ps_recv) + stats.PacketsDropped = uint32(cstats.ps_drop) + stats.PacketsIfDropped = uint32(cstats.ps_ifdrop) + + return stats, nil +} + +func (p *Pcap) Setfilter(expr string) (err error) { + var bpf _Ctype_struct_bpf_program + cexpr := C.CString(expr) + defer C.free(unsafe.Pointer(cexpr)) + + if -1 == C.pcap_compile(p.cptr, &bpf, cexpr, 1, 0) { + return p.Geterror() + } + + if -1 == C.pcap_setfilter(p.cptr, &bpf) { + C.pcap_freecode(&bpf) + return p.Geterror() + } + C.pcap_freecode(&bpf) + return nil +} + +func Version() string { + return C.GoString(C.pcap_lib_version()) +} + +func (p *Pcap) Datalink() int { + return int(C.pcap_datalink(p.cptr)) +} + +func (p *Pcap) Setdatalink(dlt int) error { + if -1 == C.pcap_set_datalink(p.cptr, C.int(dlt)) { + return p.Geterror() + } + return nil +} + +func DatalinkValueToName(dlt int) string { + if name := C.pcap_datalink_val_to_name(C.int(dlt)); name != nil { + return C.GoString(name) + } + return "" +} + +func DatalinkValueToDescription(dlt int) string { + if desc := C.pcap_datalink_val_to_description(C.int(dlt)); desc != nil { + return C.GoString(desc) + } + return "" +} + +func Findalldevs() (ifs []Interface, err error) { + var buf *C.char + buf = (*C.char)(C.calloc(ERRBUF_SIZE, 1)) + defer C.free(unsafe.Pointer(buf)) + var alldevsp *C.pcap_if_t + + if -1 == C.pcap_findalldevs((**C.pcap_if_t)(&alldevsp), buf) { + return nil, errors.New(C.GoString(buf)) + } + defer C.pcap_freealldevs((*C.pcap_if_t)(alldevsp)) + dev := alldevsp + var i uint32 + for i = 0; dev != nil; dev = (*C.pcap_if_t)(dev.next) { + i++ + } + ifs = make([]Interface, i) + dev = alldevsp + for j := uint32(0); dev != nil; dev = (*C.pcap_if_t)(dev.next) { + var iface Interface + iface.Name = C.GoString(dev.name) + iface.Description = C.GoString(dev.description) + iface.Addresses = findalladdresses(dev.addresses) + // TODO: add more elements + ifs[j] = iface + j++ + } + return +} + +func findalladdresses(addresses *_Ctype_struct_pcap_addr) (retval []IFAddress) { + // TODO - make it support more than IPv4 and IPv6? + retval = make([]IFAddress, 0, 1) + for curaddr := addresses; curaddr != nil; curaddr = (*_Ctype_struct_pcap_addr)(curaddr.next) { + var a IFAddress + var err error + if a.IP, err = sockaddr_to_IP((*syscall.RawSockaddr)(unsafe.Pointer(curaddr.addr))); err != nil { + continue + } + if a.Netmask, err = sockaddr_to_IP((*syscall.RawSockaddr)(unsafe.Pointer(curaddr.addr))); err != nil { + continue + } + retval = append(retval, a) + } + return +} + +func sockaddr_to_IP(rsa *syscall.RawSockaddr) (IP []byte, err error) { + switch rsa.Family { + case syscall.AF_INET: + pp := (*syscall.RawSockaddrInet4)(unsafe.Pointer(rsa)) + IP = make([]byte, 4) + for i := 0; i < len(IP); i++ { + IP[i] = pp.Addr[i] + } + return + case syscall.AF_INET6: + pp := (*syscall.RawSockaddrInet6)(unsafe.Pointer(rsa)) + IP = make([]byte, 16) + for i := 0; i < len(IP); i++ { + IP[i] = pp.Addr[i] + } + return + } + err = errors.New("Unsupported address type") + return +} + +func (p *Pcap) Inject(data []byte) (err error) { + buf := (*C.char)(C.malloc((C.size_t)(len(data)))) + + for i := 0; i < len(data); i++ { + *(*byte)(unsafe.Pointer(uintptr(unsafe.Pointer(buf)) + uintptr(i))) = data[i] + } + + if -1 == C.pcap_sendpacket(p.cptr, (*C.u_char)(unsafe.Pointer(buf)), (C.int)(len(data))) { + err = p.Geterror() + } + C.free(unsafe.Pointer(buf)) + return +} diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/benchmark/benchmark.go b/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/benchmark/benchmark.go new file mode 100644 index 000000000..e1303e305 --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/benchmark/benchmark.go @@ -0,0 +1,49 @@ +package main + +import ( + "flag" + "fmt" + "os" + "runtime/pprof" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap" +) + +func main() { + var filename *string = flag.String("file", "", "filename") + var decode *bool = flag.Bool("d", false, "If true, decode each packet") + var cpuprofile *string = flag.String("cpuprofile", "", "filename") + + flag.Parse() + + h, err := pcap.Openoffline(*filename) + if err != nil { + fmt.Printf("Couldn't create pcap reader: %v", err) + } + + if *cpuprofile != "" { + if out, err := os.Create(*cpuprofile); err == nil { + pprof.StartCPUProfile(out) + defer func() { + pprof.StopCPUProfile() + out.Close() + }() + } else { + panic(err) + } + } + + i, nilPackets := 0, 0 + start := time.Now() + for pkt, code := h.NextEx(); code != -2; pkt, code = h.NextEx() { + if pkt == nil { + nilPackets++ + } else if *decode { + pkt.Decode() + } + i++ + } + duration := time.Since(start) + fmt.Printf("Took %v to process %v packets, %v per packet, %d nil packets\n", duration, i, duration/time.Duration(i), nilPackets) +} diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/pass/pass.go b/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/pass/pass.go new file mode 100644 index 000000000..5a46ce09c --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/pass/pass.go @@ -0,0 +1,96 @@ +package main + +// Parses a pcap file, writes it back to disk, then verifies the files +// are the same. +import ( + "bufio" + "flag" + "fmt" + "io" + "os" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap" +) + +var input *string = flag.String("input", "", "input file") +var output *string = flag.String("output", "", "output file") +var decode *bool = flag.Bool("decode", false, "print decoded packets") + +func copyPcap(dest, src string) { + f, err := os.Open(src) + if err != nil { + fmt.Printf("couldn't open %q: %v\n", src, err) + return + } + defer f.Close() + reader, err := pcap.NewReader(bufio.NewReader(f)) + if err != nil { + fmt.Printf("couldn't create reader: %v\n", err) + return + } + w, err := os.Create(dest) + if err != nil { + fmt.Printf("couldn't open %q: %v\n", dest, err) + return + } + defer w.Close() + buf := bufio.NewWriter(w) + writer, err := pcap.NewWriter(buf, &reader.Header) + if err != nil { + fmt.Printf("couldn't create writer: %v\n", err) + return + } + for { + pkt := reader.Next() + if pkt == nil { + break + } + if *decode { + pkt.Decode() + fmt.Println(pkt.String()) + } + writer.Write(pkt) + } + buf.Flush() +} + +func check(dest, src string) { + f, err := os.Open(src) + if err != nil { + fmt.Printf("couldn't open %q: %v\n", src, err) + return + } + defer f.Close() + freader := bufio.NewReader(f) + + g, err := os.Open(dest) + if err != nil { + fmt.Printf("couldn't open %q: %v\n", src, err) + return + } + defer g.Close() + greader := bufio.NewReader(g) + + for { + fb, ferr := freader.ReadByte() + gb, gerr := greader.ReadByte() + + if ferr == io.EOF && gerr == io.EOF { + break + } + if fb == gb { + continue + } + fmt.Println("FAIL") + return + } + + fmt.Println("PASS") +} + +func main() { + flag.Parse() + + copyPcap(*output, *input) + check(*output, *input) +} diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/pcaptest/pcaptest.go b/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/pcaptest/pcaptest.go new file mode 100644 index 000000000..1da477fc8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/pcaptest/pcaptest.go @@ -0,0 +1,82 @@ +package main + +import ( + "flag" + "fmt" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap" +) + +func min(x uint32, y uint32) uint32 { + if x < y { + return x + } + return y +} + +func main() { + var device *string = flag.String("d", "", "device") + var file *string = flag.String("r", "", "file") + var expr *string = flag.String("e", "", "filter expression") + + flag.Parse() + + var h *pcap.Pcap + var err error + + ifs, err := pcap.Findalldevs() + if len(ifs) == 0 { + fmt.Printf("Warning: no devices found : %s\n", err) + } else { + for i := 0; i < len(ifs); i++ { + fmt.Printf("dev %d: %s (%s)\n", i+1, ifs[i].Name, ifs[i].Description) + } + } + + if *device != "" { + h, err = pcap.Openlive(*device, 65535, true, 0) + if h == nil { + fmt.Printf("Openlive(%s) failed: %s\n", *device, err) + return + } + } else if *file != "" { + h, err = pcap.Openoffline(*file) + if h == nil { + fmt.Printf("Openoffline(%s) failed: %s\n", *file, err) + return + } + } else { + fmt.Printf("usage: pcaptest [-d | -r ]\n") + return + } + defer h.Close() + + fmt.Printf("pcap version: %s\n", pcap.Version()) + + if *expr != "" { + fmt.Printf("Setting filter: %s\n", *expr) + err := h.Setfilter(*expr) + if err != nil { + fmt.Printf("Warning: setting filter failed: %s\n", err) + } + } + + for pkt := h.Next(); pkt != nil; pkt = h.Next() { + fmt.Printf("time: %d.%06d (%s) caplen: %d len: %d\nData:", + int64(pkt.Time.Second()), int64(pkt.Time.Nanosecond()), + time.Unix(int64(pkt.Time.Second()), 0).String(), int64(pkt.Caplen), int64(pkt.Len)) + for i := uint32(0); i < pkt.Caplen; i++ { + if i%32 == 0 { + fmt.Printf("\n") + } + if 32 <= pkt.Data[i] && pkt.Data[i] <= 126 { + fmt.Printf("%c", pkt.Data[i]) + } else { + fmt.Printf(".") + } + } + fmt.Printf("\n\n") + } + +} diff --git a/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/tcpdump/tcpdump.go b/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/tcpdump/tcpdump.go new file mode 100644 index 000000000..0a07bcc9d --- /dev/null +++ b/Godeps/_workspace/src/github.com/akrennmair/gopcap/tools/tcpdump/tcpdump.go @@ -0,0 +1,121 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "os" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap" +) + +const ( + TYPE_IP = 0x0800 + TYPE_ARP = 0x0806 + TYPE_IP6 = 0x86DD + + IP_ICMP = 1 + IP_INIP = 4 + IP_TCP = 6 + IP_UDP = 17 +) + +var out *bufio.Writer +var errout *bufio.Writer + +func main() { + var device *string = flag.String("i", "", "interface") + var snaplen *int = flag.Int("s", 65535, "snaplen") + var hexdump *bool = flag.Bool("X", false, "hexdump") + expr := "" + + out = bufio.NewWriter(os.Stdout) + errout = bufio.NewWriter(os.Stderr) + + flag.Usage = func() { + fmt.Fprintf(errout, "usage: %s [ -i interface ] [ -s snaplen ] [ -X ] [ expression ]\n", os.Args[0]) + errout.Flush() + os.Exit(1) + } + + flag.Parse() + + if len(flag.Args()) > 0 { + expr = flag.Arg(0) + } + + if *device == "" { + devs, err := pcap.Findalldevs() + if err != nil { + fmt.Fprintf(errout, "tcpdump: couldn't find any devices: %s\n", err) + } + if 0 == len(devs) { + flag.Usage() + } + *device = devs[0].Name + } + + h, err := pcap.Openlive(*device, int32(*snaplen), true, 0) + if h == nil { + fmt.Fprintf(errout, "tcpdump: %s\n", err) + errout.Flush() + return + } + defer h.Close() + + if expr != "" { + ferr := h.Setfilter(expr) + if ferr != nil { + fmt.Fprintf(out, "tcpdump: %s\n", ferr) + out.Flush() + } + } + + for pkt := h.Next(); pkt != nil; pkt = h.Next() { + pkt.Decode() + fmt.Fprintf(out, "%s\n", pkt.String()) + if *hexdump { + Hexdump(pkt) + } + out.Flush() + } +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func Hexdump(pkt *pcap.Packet) { + for i := 0; i < len(pkt.Data); i += 16 { + Dumpline(uint32(i), pkt.Data[i:min(i+16, len(pkt.Data))]) + } +} + +func Dumpline(addr uint32, line []byte) { + fmt.Fprintf(out, "\t0x%04x: ", int32(addr)) + var i uint16 + for i = 0; i < 16 && i < uint16(len(line)); i++ { + if i%2 == 0 { + out.WriteString(" ") + } + fmt.Fprintf(out, "%02x", line[i]) + } + for j := i; j <= 16; j++ { + if j%2 == 0 { + out.WriteString(" ") + } + out.WriteString(" ") + } + out.WriteString(" ") + for i = 0; i < 16 && i < uint16(len(line)); i++ { + if line[i] >= 32 && line[i] <= 126 { + fmt.Fprintf(out, "%c", line[i]) + } else { + out.WriteString(".") + } + } + out.WriteString("\n") +} diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/.travis.yml b/Godeps/_workspace/src/github.com/spacejam/loghisto/.travis.yml new file mode 100644 index 000000000..e8197a258 --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/.travis.yml @@ -0,0 +1,5 @@ +language: go + +go: + - 1.4 + - tip diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/graphite.go b/Godeps/_workspace/src/github.com/spacejam/loghisto/graphite.go new file mode 100644 index 000000000..6d562d0f5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/graphite.go @@ -0,0 +1,75 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Tyler Neely (t@jujit.su) + +package loghisto + +import ( + "bytes" + "fmt" + "os" + "strings" + +) + +type graphiteStat struct { + Metric string + Time int64 + Value float64 + Host string +} + +type graphiteStatArray []*graphiteStat + +func (stats graphiteStatArray) ToRequest() []byte { + var request bytes.Buffer + for _, stat := range stats { + request.Write([]byte(fmt.Sprintf("cockroach.%s.%s %f %d\n", + stat.Host, + strings.Replace(stat.Metric, "_", ".", -1), + stat.Value, + stat.Time, + ))) + } + return []byte(request.String()) +} + +func (metricSet *ProcessedMetricSet) tographiteStats() graphiteStatArray { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + + stats := make([]*graphiteStat, 0, len(metricSet.Metrics)) + i := 0 + for metric, value := range metricSet.Metrics { + //TODO(tyler) custom tags + stats = append(stats, &graphiteStat{ + Metric: metric, + Time: metricSet.Time.Unix(), + Value: value, + Host: hostname, + }) + i++ + } + return stats +} + +// GraphiteProtocol generates a wire representation of a ProcessedMetricSet +// for submission to a Graphite Carbon instance using the plaintext protocol. +func GraphiteProtocol(ms *ProcessedMetricSet) []byte { + return ms.tographiteStats().ToRequest() +} diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/graphite_test.go b/Godeps/_workspace/src/github.com/spacejam/loghisto/graphite_test.go new file mode 100644 index 000000000..21cd46f4d --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/graphite_test.go @@ -0,0 +1,23 @@ +package loghisto + +import ( + "testing" + "time" +) + +func TestGraphite(t *testing.T) { + ms := NewMetricSystem(time.Second, true) + s := NewSubmitter(ms, GraphiteProtocol, "tcp", "localhost:7777") + s.Start() + + metrics := &ProcessedMetricSet{ + Time: time.Now(), + Metrics: map[string]float64{ + "test.3": 50.54, + "test.4": 10.21, + }, + } + request := s.serializer(metrics) + s.submit(request) + s.Shutdown() +} diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/metrics.go b/Godeps/_workspace/src/github.com/spacejam/loghisto/metrics.go new file mode 100644 index 000000000..6ca0a1641 --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/metrics.go @@ -0,0 +1,653 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Tyler Neely (t@jujit.su) + +// IMPORTANT: only subscribe to the metric stream +// using buffered channels that are regularly +// flushed, as reaper will NOT block while trying +// to send metrics to a subscriber, and will ignore +// a subscriber if they fail to clear their channel +// 3 times in a row! + +package loghisto + +import ( + "errors" + "fmt" + "math" + "runtime" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/golang/glog" +) + +const ( + // precision effects the bucketing used during histogram value compression. + precision = 100 +) + +// ProcessedMetricSet contains human-readable metrics that may also be +// suitable for storage in time-series databases. +type ProcessedMetricSet struct { + Time time.Time + Metrics map[string]float64 +} + +// RawMetricSet contains metrics in a form that supports generation of +// percentiles and other rich statistics. +type RawMetricSet struct { + Time time.Time + Counters map[string]uint64 + Rates map[string]uint64 + Histograms map[string]map[int16]*uint64 + Gauges map[string]float64 +} + +// TimerToken facilitates concurrent timings of durations of the same label. +type TimerToken struct { + Name string + Start time.Time + MetricSystem *MetricSystem +} + +// proportion is a compact value with a corresponding count of +// occurrences in this interval. +type proportion struct { + Value float64 + Count uint64 +} + +// proportionArray is a sortable collection of proportion types. +type proportionArray []proportion + +// MetricSystem facilitates the collection and distribution of metrics. +type MetricSystem struct { + // percentiles is a mapping from labels to desired percentiles to be + // calculated by the MetricSystem + percentiles map[string]float64 + // interval is the duration between collections and broadcasts of metrics + // to subscribers. + interval time.Duration + // subscribeToRawMetrics allows subscription to a RawMetricSet generated + // by reaper at the end of each interval on a sent channel. + subscribeToRawMetrics chan chan *RawMetricSet + // unsubscribeFromRawMetrics allows subscribers to unsubscribe from + // receiving a RawMetricSet on the sent channel. + unsubscribeFromRawMetrics chan chan *RawMetricSet + // subscribeToProcessedMetrics allows subscription to a ProcessedMetricSet + // generated by reaper at the end of each interval on a sent channel. + subscribeToProcessedMetrics chan chan *ProcessedMetricSet + // unsubscribeFromProcessedMetrics allows subscribers to unsubscribe from + // receiving a ProcessedMetricSet on the sent channel. + unsubscribeFromProcessedMetrics chan chan *ProcessedMetricSet + // rawSubscribers stores current subscribers to RawMetrics + rawSubscribers map[chan *RawMetricSet]struct{} + // rawBadSubscribers tracks misbehaving subscribers who do not clear their + // subscription channels regularly. + rawBadSubscribers map[chan *RawMetricSet]int + // processedSubscribers stores current subscribers to ProcessedMetrics + processedSubscribers map[chan *ProcessedMetricSet]struct{} + // processedBadSubscribers tracks misbehaving subscribers who do not clear + // their subscription channels regularly. + processedBadSubscribers map[chan *ProcessedMetricSet]int + // subscribersMu controls access to subscription structures + subscribersMu sync.RWMutex + // counterStore maintains the total counts of counters. + counterStore map[string]*uint64 + counterStoreMu sync.RWMutex + // counterCache aggregates new Counters until they are collected by reaper(). + counterCache map[string]*uint64 + // counterMu controls access to counterCache. + counterMu sync.RWMutex + // histogramCache aggregates Histograms until they are collected by reaper(). + histogramCache map[string]map[int16]*uint64 + // histogramMu controls access to histogramCache. + histogramMu sync.RWMutex + // histogramCountStore keeps track of aggregate counts and sums for aggregate + // mean calculation. + histogramCountStore map[string]*uint64 + // histogramCountMu controls access to the histogramCountStore. + histogramCountMu sync.RWMutex + // gaugeFuncs maps metrics to functions used for calculating their value + gaugeFuncs map[string]func() float64 + // gaugeFuncsMu controls access to the gaugeFuncs map. + gaugeFuncsMu sync.Mutex + // Has reaper() been started? + reaping bool + // Close this to bring down this MetricSystem + shutdownChan chan struct{} +} + +// Metrics is the default metric system, which collects and broadcasts metrics +// to subscribers once every 60 seconds. Also includes default system stats. +var Metrics = NewMetricSystem(60*time.Second, true) + +// NewMetricSystem returns a new metric system that collects and broadcasts +// metrics after each interval. +func NewMetricSystem(interval time.Duration, sysStats bool) *MetricSystem { + ms := &MetricSystem{ + percentiles: map[string]float64{ + "%s_min": 0, + "%s_50": .5, + "%s_75": .75, + "%s_90": .9, + "%s_95": .95, + "%s_99": .99, + "%s_99.9": .999, + "%s_99.99": .9999, + "%s_max": 1, + }, + interval: interval, + subscribeToRawMetrics: make(chan chan *RawMetricSet, 64), + unsubscribeFromRawMetrics: make(chan chan *RawMetricSet, 64), + subscribeToProcessedMetrics: make(chan chan *ProcessedMetricSet, 64), + unsubscribeFromProcessedMetrics: make(chan chan *ProcessedMetricSet, 64), + rawSubscribers: make(map[chan *RawMetricSet]struct{}), + rawBadSubscribers: make(map[chan *RawMetricSet]int), + processedSubscribers: make(map[chan *ProcessedMetricSet]struct{}), + processedBadSubscribers: make(map[chan *ProcessedMetricSet]int), + counterStore: make(map[string]*uint64), + counterCache: make(map[string]*uint64), + histogramCache: make(map[string]map[int16]*uint64), + histogramCountStore: make(map[string]*uint64), + gaugeFuncs: make(map[string]func() float64), + shutdownChan: make(chan struct{}), + } + if sysStats { + ms.gaugeFuncsMu.Lock() + ms.gaugeFuncs["sys.Alloc"] = func() float64 { + memStats := new(runtime.MemStats) + runtime.ReadMemStats(memStats) + return float64(memStats.Alloc) + } + ms.gaugeFuncs["sys.NumGC"] = func() float64 { + memStats := new(runtime.MemStats) + runtime.ReadMemStats(memStats) + return float64(memStats.NumGC) + } + ms.gaugeFuncs["sys.PauseTotalNs"] = func() float64 { + memStats := new(runtime.MemStats) + runtime.ReadMemStats(memStats) + return float64(memStats.PauseTotalNs) + } + ms.gaugeFuncs["sys.NumGoroutine"] = func() float64 { + return float64(runtime.NumGoroutine()) + } + ms.gaugeFuncsMu.Unlock() + } + return ms +} + +// SpecifyPercentiles allows users to override the default collected +// and reported percentiles. +func (ms *MetricSystem) SpecifyPercentiles(percentiles map[string]float64) { + ms.percentiles = percentiles +} + +// SubscribeToRawMetrics registers a channel to receive RawMetricSets +// periodically generated by reaper at each interval. +func (ms *MetricSystem) SubscribeToRawMetrics(metricStream chan *RawMetricSet) { + ms.subscribeToRawMetrics <- metricStream +} + +// UnsubscribeFromRawMetrics registers a channel to receive RawMetricSets +// periodically generated by reaper at each interval. +func (ms *MetricSystem) UnsubscribeFromRawMetrics( + metricStream chan *RawMetricSet) { + ms.unsubscribeFromRawMetrics <- metricStream +} + +// SubscribeToProcessedMetrics registers a channel to receive +// ProcessedMetricSets periodically generated by reaper at each interval. +func (ms *MetricSystem) SubscribeToProcessedMetrics( + metricStream chan *ProcessedMetricSet) { + ms.subscribeToProcessedMetrics <- metricStream +} + +// UnsubscribeFromProcessedMetrics registers a channel to receive +// ProcessedMetricSets periodically generated by reaper at each interval. +func (ms *MetricSystem) UnsubscribeFromProcessedMetrics( + metricStream chan *ProcessedMetricSet) { + ms.unsubscribeFromProcessedMetrics <- metricStream +} + +// StartTimer begins a timer and returns a token which is required for halting +// the timer. This allows for concurrent timings under the same name. +func (ms *MetricSystem) StartTimer(name string) TimerToken { + return TimerToken{ + Name: name, + Start: time.Now(), + MetricSystem: ms, + } +} + +// Stop stops a timer given by StartTimer, submits a Histogram of its duration +// in nanoseconds, and returns its duration in nanoseconds. +func (tt *TimerToken) Stop() time.Duration { + duration := time.Since(tt.Start) + tt.MetricSystem.Histogram(tt.Name, float64(duration.Nanoseconds())) + return duration +} + +// Counter is used for recording a running count of the total occurrences of +// a particular event. A rate is also exported for the amount that a counter +// has increased during an interval of this MetricSystem. +func (ms *MetricSystem) Counter(name string, amount uint64) { + ms.counterMu.RLock() + _, exists := ms.counterCache[name] + // perform lock promotion when we need more control + if exists { + atomic.AddUint64(ms.counterCache[name], amount) + ms.counterMu.RUnlock() + } else { + ms.counterMu.RUnlock() + ms.counterMu.Lock() + _, syncExists := ms.counterCache[name] + if !syncExists { + var z uint64 + ms.counterCache[name] = &z + } + atomic.AddUint64(ms.counterCache[name], amount) + ms.counterMu.Unlock() + } +} + +// Histogram is used for generating rich metrics, such as percentiles, from +// periodically occurring continuous values. +func (ms *MetricSystem) Histogram(name string, value float64) { + compressedValue := compress(value) + ms.histogramMu.RLock() + _, present := ms.histogramCache[name][compressedValue] + if present { + atomic.AddUint64(ms.histogramCache[name][compressedValue], 1) + ms.histogramMu.RUnlock() + } else { + ms.histogramMu.RUnlock() + ms.histogramMu.Lock() + _, syncPresent := ms.histogramCache[name][compressedValue] + if !syncPresent { + var z uint64 + _, mapPresent := ms.histogramCache[name] + if !mapPresent { + ms.histogramCache[name] = make(map[int16]*uint64) + } + ms.histogramCache[name][compressedValue] = &z + } + atomic.AddUint64(ms.histogramCache[name][compressedValue], 1) + ms.histogramMu.Unlock() + } +} + +// RegisterGaugeFunc registers a function to be called at each interval +// whose return value will be used to populate the metric. +func (ms *MetricSystem) RegisterGaugeFunc(name string, f func() float64) { + ms.gaugeFuncsMu.Lock() + ms.gaugeFuncs[name] = f + ms.gaugeFuncsMu.Unlock() +} + +// DeregisterGaugeFunc deregisters a function for the metric. +func (ms *MetricSystem) DeregisterGaugeFunc(name string) { + ms.gaugeFuncsMu.Lock() + delete(ms.gaugeFuncs, name) + ms.gaugeFuncsMu.Unlock() +} + +// compress takes a float64 and lossily shrinks it to an int16 to facilitate +// bucketing of histogram values, staying within 1% of the true value. This +// fails for large values of 1e142 and above, and is inaccurate for values +// closer to 0 than +/- 0.51 or +/- math.Inf. +func compress(value float64) int16 { + i := int16(precision*math.Log(1.0+math.Abs(value)) + 0.5) + if value < 0 { + return -1 * i + } + return i +} + +// decompress takes a lossily shrunk int16 and returns a float64 within 1% of +// the original float64 passed to compress. +func decompress(compressedValue int16) float64 { + f := math.Exp(math.Abs(float64(compressedValue))/precision) - 1.0 + if compressedValue < 0 { + return -1.0 * f + } + return f +} + +// processHistograms derives rich metrics from histograms, currently +// percentiles, sum, count, and mean. +func (ms *MetricSystem) processHistograms(name string, + valuesToCounts map[int16]*uint64) map[string]float64 { + output := make(map[string]float64) + totalSum := float64(0) + totalCount := uint64(0) + proportions := make([]proportion, 0, len(valuesToCounts)) + for compressedValue, count := range valuesToCounts { + value := decompress(compressedValue) + totalSum += value * float64(*count) + totalCount += *count + proportions = append(proportions, proportion{Value: value, Count: *count}) + } + + sumName := fmt.Sprintf("%s_sum", name) + countName := fmt.Sprintf("%s_count", name) + avgName := fmt.Sprintf("%s_avg", name) + + // increment interval sum and count + output[countName] = float64(totalCount) + output[sumName] = totalSum + output[avgName] = totalSum / float64(totalCount) + + // increment aggregate sum and count + ms.histogramCountMu.RLock() + _, present := ms.histogramCountStore[sumName] + if !present { + ms.histogramCountMu.RUnlock() + ms.histogramCountMu.Lock() + _, syncPresent := ms.histogramCountStore[sumName] + if !syncPresent { + var x uint64 + ms.histogramCountStore[sumName] = &x + var z uint64 + ms.histogramCountStore[countName] = &z + } + ms.histogramCountMu.Unlock() + ms.histogramCountMu.RLock() + } + atomic.AddUint64(ms.histogramCountStore[sumName], uint64(totalSum)) + atomic.AddUint64(ms.histogramCountStore[countName], totalCount) + ms.histogramCountMu.RUnlock() + + for label, p := range ms.percentiles { + value, err := percentile(totalCount, proportions, p) + if err != nil { + glog.Errorf("unable to calculate percentile: %s", err) + } else { + output[fmt.Sprintf(label, name)] = value + } + } + return output +} + +// These next 3 methods are for the implementation of sort.Interface + +func (s proportionArray) Len() int { + return len(s) +} + +func (s proportionArray) Less(i, j int) bool { + return s[i].Value < s[j].Value +} + +func (s proportionArray) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// percentile calculates a percentile represented as a float64 between 0 and 1 +// inclusive from a proportionArray. totalCount is the sum of all counts of +// elements in the proportionArray. +func percentile(totalCount uint64, proportions proportionArray, + percentile float64) (float64, error) { + //TODO(tyler) handle multiple percentiles at once for efficiency + sort.Sort(proportions) + sofar := uint64(0) + for _, proportion := range proportions { + sofar += proportion.Count + if float64(sofar)/float64(totalCount) >= percentile { + return proportion.Value, nil + } + } + return 0, errors.New("Invalid percentile. Should be between 0 and 1.") +} + +func (ms *MetricSystem) collectRawMetrics() *RawMetricSet { + normalizedInterval := time.Unix(0, time.Now().UnixNano()/ + ms.interval.Nanoseconds()* + ms.interval.Nanoseconds()) + + ms.counterMu.Lock() + freshCounters := ms.counterCache + ms.counterCache = make(map[string]*uint64) + ms.counterMu.Unlock() + + rates := make(map[string]uint64) + for name, count := range freshCounters { + rates[name] = *count + } + + counters := make(map[string]uint64) + ms.counterStoreMu.RLock() + // update counters + for name, count := range freshCounters { + _, exists := ms.counterStore[name] + // only take a write lock when it's a totally new counter + if !exists { + ms.counterStoreMu.RUnlock() + ms.counterStoreMu.Lock() + _, syncExists := ms.counterStore[name] + if !syncExists { + var z uint64 + ms.counterStore[name] = &z + } + ms.counterStoreMu.Unlock() + ms.counterStoreMu.RLock() + } + atomic.AddUint64(ms.counterStore[name], *count) + } + // copy counters for export + for name, count := range ms.counterStore { + counters[name] = *count + } + ms.counterStoreMu.RUnlock() + + ms.histogramMu.Lock() + histograms := ms.histogramCache + ms.histogramCache = make(map[string]map[int16]*uint64) + ms.histogramMu.Unlock() + + ms.gaugeFuncsMu.Lock() + gauges := make(map[string]float64) + for name, f := range ms.gaugeFuncs { + gauges[name] = f() + } + ms.gaugeFuncsMu.Unlock() + + return &RawMetricSet{ + Time: normalizedInterval, + Counters: counters, + Rates: rates, + Histograms: histograms, + Gauges: gauges, + } +} + +// processMetrics (potentially slowly) creates human consumable metrics from a +// RawMetricSet, deriving rich statistics from histograms such as percentiles. +func (ms *MetricSystem) processMetrics( + rawMetrics *RawMetricSet) *ProcessedMetricSet { + metrics := make(map[string]float64) + + for name, count := range rawMetrics.Counters { + metrics[name] = float64(count) + } + + for name, count := range rawMetrics.Rates { + metrics[fmt.Sprintf("%s_rate", name)] = float64(count) + } + + for name, valuesToCounts := range rawMetrics.Histograms { + for histoName, histoValue := range ms.processHistograms(name, valuesToCounts) { + metrics[histoName] = histoValue + } + } + + for name, value := range rawMetrics.Gauges { + metrics[name] = value + } + + return &ProcessedMetricSet{Time: rawMetrics.Time, Metrics: metrics} +} + +func (ms *MetricSystem) updateSubscribers() { + ms.subscribersMu.Lock() + defer ms.subscribersMu.Unlock() + for { + select { + case subscriber := <-ms.subscribeToRawMetrics: + ms.rawSubscribers[subscriber] = struct{}{} + case unsubscriber := <-ms.unsubscribeFromRawMetrics: + delete(ms.rawSubscribers, unsubscriber) + case subscriber := <-ms.subscribeToProcessedMetrics: + ms.processedSubscribers[subscriber] = struct{}{} + case unsubscriber := <-ms.unsubscribeFromProcessedMetrics: + delete(ms.processedSubscribers, unsubscriber) + default: // no changes in subscribers + return + } + } +} + +// reaper wakes up every seconds, +// collects and processes metrics, and pushes +// them to the corresponding subscribing channels. +func (ms *MetricSystem) reaper() { + ms.reaping = true + + // create goroutine pool to handle multiple processing tasks at once + processChan := make(chan func(), 16) + for i := 0; i < int(math.Max(float64(runtime.NumCPU()/4), 4)); i++ { + go func() { + for { + c, ok := <-processChan + if !ok { + return + } + c() + } + }() + } + + // begin reaper main loop + for { + // sleep until the next interval, or die if shutdownChan is closed + tts := ms.interval.Nanoseconds() - + (time.Now().UnixNano() % ms.interval.Nanoseconds()) + select { + case <-time.After(time.Duration(tts)): + case <-ms.shutdownChan: + ms.reaping = false + close(processChan) + return + } + + rawMetrics := ms.collectRawMetrics() + + ms.updateSubscribers() + + // broadcast raw metrics + for subscriber := range ms.rawSubscribers { + // new subscribers get all counters, otherwise just the new diffs + select { + case subscriber <- rawMetrics: + delete(ms.rawBadSubscribers, subscriber) + default: + ms.rawBadSubscribers[subscriber]++ + glog.Error("a raw subscriber has allowed their channel to fill up. ", + "dropping their metrics on the floor rather than blocking.") + if ms.rawBadSubscribers[subscriber] >= 2 { + glog.Error("this raw subscriber has caused dropped metrics at ", + "least 3 times in a row. closing the channel.") + delete(ms.rawSubscribers, subscriber) + close(subscriber) + } + } + } + + // Perform the rest in another goroutine since processing is not + // gauranteed to complete before the interval is up. + sendProcessed := func() { + // this is potentially expensive if there is a massive number of metrics + processedMetrics := ms.processMetrics(rawMetrics) + + // add aggregate mean + for name := range rawMetrics.Histograms { + ms.histogramCountMu.RLock() + aggCountPtr, countPresent := + ms.histogramCountStore[fmt.Sprintf("%s_count", name)] + aggCount := atomic.LoadUint64(aggCountPtr) + aggSumPtr, sumPresent := + ms.histogramCountStore[fmt.Sprintf("%s_sum", name)] + aggSum := atomic.LoadUint64(aggSumPtr) + ms.histogramCountMu.RUnlock() + + if countPresent && sumPresent && aggCount > 0 { + processedMetrics.Metrics[fmt.Sprintf("%s_agg_avg", name)] = + float64(aggSum / aggCount) + processedMetrics.Metrics[fmt.Sprintf("%s_agg_count", name)] = + float64(aggCount) + processedMetrics.Metrics[fmt.Sprintf("%s_agg_sum", name)] = + float64(aggSum) + } + } + + // broadcast processed metrics + ms.subscribersMu.Lock() + for subscriber := range ms.processedSubscribers { + select { + case subscriber <- processedMetrics: + delete(ms.processedBadSubscribers, subscriber) + default: + ms.processedBadSubscribers[subscriber]++ + glog.Error("a subscriber has allowed their channel to fill up. ", + "dropping their metrics on the floor rather than blocking.") + if ms.processedBadSubscribers[subscriber] >= 2 { + glog.Error("this subscriber has caused dropped metrics at ", + "least 3 times in a row. closing the channel.") + delete(ms.processedSubscribers, subscriber) + close(subscriber) + } + } + } + ms.subscribersMu.Unlock() + } + select { + case processChan <- sendProcessed: + default: + // processChan has filled up, this metric load is not sustainable + glog.Errorf("processing of metrics is taking longer than this node can "+ + "handle. dropping this entire interval of %s metrics on the "+ + "floor rather than blocking the reaper.", rawMetrics.Time) + } + } // end main reaper loop +} + +// Start spawns a goroutine for merging metrics into caches from +// metric submitters, and a reaper goroutine that harvests metrics at the +// default interval of every 60 seconds. +func (ms *MetricSystem) Start() { + if !ms.reaping { + go ms.reaper() + } +} + +// Stop shuts down a MetricSystem +func (ms *MetricSystem) Stop() { + close(ms.shutdownChan) +} diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/metrics_test.go b/Godeps/_workspace/src/github.com/spacejam/loghisto/metrics_test.go new file mode 100644 index 000000000..cc9fb16af --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/metrics_test.go @@ -0,0 +1,363 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Tyler Neely (t@jujit.su) + +package loghisto + +import ( + "fmt" + "math" + "runtime" + "testing" + "time" +) + +func ExampleMetricSystem() { + ms := NewMetricSystem(time.Microsecond, true) + ms.Start() + myMetricStream := make(chan *ProcessedMetricSet, 2) + ms.SubscribeToProcessedMetrics(myMetricStream) + + timeToken := ms.StartTimer("submit_metrics") + ms.Counter("range_splits", 1) + ms.Histogram("some_ipc_latency", 123) + timeToken.Stop() + + processedMetricSet := <-myMetricStream + ms.UnsubscribeFromProcessedMetrics(myMetricStream) + + m := processedMetricSet.Metrics + + example := []struct { + Name string + Value float64 + }{ + { + "total range splits during the process lifetime", + m["range_splits"], + }, { + "range splits in this period", + m["range_splits_rate"], + }, { + "some_ipc 99.9th percentile", + m["some_ipc_latency_99.9"], + }, { + "some_ipc max", + m["some_ipc_latency_max"], + }, { + "some_ipc calls this period", + m["some_ipc_latency_count"], + }, { + "some_ipc calls during the process lifetime", + m["some_ipc_latency_agg_count"], + }, { + "some_ipc total latency this period", + m["some_ipc_latency_sum"], + }, { + "some_ipc mean this period", + m["some_ipc_latency_avg"], + }, { + "some_ipc aggregate man", + m["some_ipc_latency_agg_avg"], + }, { + "time spent submitting metrics this period", + m["submit_metrics_sum"], + }, { + "number of goroutines", + m["sys.NumGoroutine"], + }, { + "time spent in GC", + m["sys.PauseTotalNs"], + }, + } + for _, nameValue := range example { + var result string + if nameValue.Value == float64(0) { + result = "NOT present" + } else { + result = "present" + } + fmt.Println(nameValue.Name, result) + } + ms.Stop() + // Output: + // total range splits during the process lifetime present + // range splits in this period present + // some_ipc 99.9th percentile present + // some_ipc max present + // some_ipc calls this period present + // some_ipc calls during the process lifetime present + // some_ipc total latency this period present + // some_ipc mean this period present + // some_ipc aggregate man present + // time spent submitting metrics this period present + // number of goroutines present + // time spent in GC present +} + +func TestPercentile(t *testing.T) { + metrics := map[float64]uint64{ + 10: 9000, + 25: 900, + 33: 90, + 47: 9, + 500: 1, + } + + percentileToExpected := map[float64]float64{ + 0: 10, + .99: 25, + .999: 33, + .9991: 47, + .9999: 47, + 1: 500, + } + + totalcount := uint64(0) + proportions := make([]proportion, 0, len(metrics)) + for value, count := range metrics { + totalcount += count + proportions = append(proportions, proportion{Value: value, Count: count}) + } + + for p, expected := range percentileToExpected { + result, err := percentile(totalcount, proportions, p) + if err != nil { + t.Error("error:", err) + } + + // results must be within 1% of their expected values. + diff := math.Abs(expected/result - 1) + if diff > .01 { + t.Errorf("percentile: %.04f, expected: %.04f, actual: %.04f, %% off: %.04f\n", + p, expected, result, diff*100) + } + } +} + +func TestCompress(t *testing.T) { + toTest := []float64{ + -421408208120481, + -1, + 0, + 1, + 214141241241241, + } + for _, f := range toTest { + result := decompress(compress(f)) + var diff float64 + if result == 0 { + diff = math.Abs(f - result) + } else { + diff = math.Abs(f/result - 1) + } + if diff > .01 { + t.Errorf("expected: %f, actual: %f, %% off: %.04f\n", + f, result, diff*100) + } + } +} + +func TestSysStats(t *testing.T) { + metricSystem := NewMetricSystem(time.Microsecond, true) + gauges := metricSystem.collectRawMetrics().Gauges + v, present := gauges["sys.Alloc"] + if v <= 0 || !present { + t.Errorf("expected positive reported allocated bytes, got %f\n", v) + } +} + +func TestTimer(t *testing.T) { + metricSystem := NewMetricSystem(time.Microsecond, false) + token1 := metricSystem.StartTimer("timer1") + token2 := metricSystem.StartTimer("timer1") + time.Sleep(50 & time.Microsecond) + token1.Stop() + time.Sleep(5 * time.Microsecond) + token2.Stop() + token3 := metricSystem.StartTimer("timer1") + time.Sleep(10 * time.Microsecond) + token3.Stop() + result := metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics + + if result["timer1_min"] > result["timer1_50"] || + result["timer1_50"] > result["timer1_max"] { + t.Error("bad result map:", result) + } +} + +func TestRate(t *testing.T) { + metricSystem := NewMetricSystem(time.Microsecond, false) + metricSystem.Counter("rate1", 777) + time.Sleep(20 * time.Millisecond) + metrics := metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics + if metrics["rate1_rate"] != 777 { + t.Error("count one value") + } + metricSystem.Counter("rate1", 1223) + time.Sleep(20 * time.Millisecond) + metrics = metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics + if metrics["rate1_rate"] != 1223 { + t.Errorf("expected rate: 1223, actual: %f", metrics["rate1_rate"]) + } + metricSystem.Counter("rate1", 1223) + metricSystem.Counter("rate1", 1223) + time.Sleep(20 * time.Millisecond) + metrics = metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics + if metrics["rate1_rate"] != 2446 { + t.Errorf("expected rate: 2446, actual: %f", metrics["rate1_rate"]) + } +} + +func TestCounter(t *testing.T) { + metricSystem := NewMetricSystem(time.Microsecond, false) + metricSystem.Counter("counter1", 3290) + time.Sleep(20 * time.Millisecond) + metrics := metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics + if metrics["counter1"] != 3290 { + t.Error("count one value", metrics) + } + metricSystem.Counter("counter1", 10000) + time.Sleep(20 * time.Millisecond) + metrics = metricSystem.processMetrics(metricSystem.collectRawMetrics()).Metrics + if metrics["counter1"] != 13290 { + t.Error("accumulate counts across broadcasts") + } + +} + +func TestUpdateSubscribers(t *testing.T) { + rawMetricStream := make(chan *RawMetricSet) + processedMetricStream := make(chan *ProcessedMetricSet) + + metricSystem := NewMetricSystem(2*time.Microsecond, false) + metricSystem.SubscribeToRawMetrics(rawMetricStream) + metricSystem.SubscribeToProcessedMetrics(processedMetricStream) + + metricSystem.Counter("counter5", 33) + + go func() { + select { + case <-rawMetricStream: + case <-time.After(20 * time.Millisecond): + t.Error("received no raw metrics from the MetricSystem after 2 milliseconds.") + } + metricSystem.UnsubscribeFromRawMetrics(rawMetricStream) + }() + go func() { + select { + case <-processedMetricStream: + case <-time.After(20 * time.Millisecond): + t.Error("received no processed metrics from the MetricSystem after 2 milliseconds.") + } + metricSystem.UnsubscribeFromProcessedMetrics(processedMetricStream) + }() + + metricSystem.Start() + time.Sleep(20 * time.Millisecond) + + go func() { + select { + case <-rawMetricStream: + t.Error("received raw metrics from the MetricSystem after unsubscribing.") + default: + } + }() + go func() { + select { + case <-processedMetricStream: + t.Error("received processed metrics from the MetricSystem after unsubscribing.") + default: + } + }() + time.Sleep(20 * time.Millisecond) +} + +func TestProcessedBroadcast(t *testing.T) { + processedMetricStream := make(chan *ProcessedMetricSet, 128) + metricSystem := NewMetricSystem(time.Microsecond, false) + metricSystem.SubscribeToProcessedMetrics(processedMetricStream) + + metricSystem.Histogram("histogram1", 33) + metricSystem.Histogram("histogram1", 59) + metricSystem.Histogram("histogram1", 330000) + metricSystem.Start() + + select { + case processedMetrics := <-processedMetricStream: + if int(processedMetrics.Metrics["histogram1_sum"]) != 331132 { + t.Error("expected histogram1_sum to be 331132, instead was", + processedMetrics.Metrics["histogram1_sum"]) + } + if int(processedMetrics.Metrics["histogram1_agg_avg"]) != 110377 { + t.Error("expected histogram1_agg_avg to be 110377, instead was", + processedMetrics.Metrics["histogram1_agg_avg"]) + } + if int(processedMetrics.Metrics["histogram1_count"]) != 3 { + t.Error("expected histogram1_count to be 3, instead was", + processedMetrics.Metrics["histogram1_count"]) + } + case <-time.After(20 * time.Millisecond): + t.Error("received no metrics from the MetricSystem after 2 milliseconds.") + } + + metricSystem.UnsubscribeFromProcessedMetrics(processedMetricStream) + metricSystem.Stop() +} + +func TestRawBroadcast(t *testing.T) { + rawMetricStream := make(chan *RawMetricSet, 128) + metricSystem := NewMetricSystem(time.Microsecond, false) + metricSystem.SubscribeToRawMetrics(rawMetricStream) + + metricSystem.Counter("counter2", 10) + metricSystem.Counter("counter2", 111) + metricSystem.Start() + + select { + case rawMetrics := <-rawMetricStream: + if rawMetrics.Counters["counter2"] != 121 { + t.Error("expected counter2 to be 121, instead was", + rawMetrics.Counters["counter2"]) + } + if rawMetrics.Rates["counter2"] != 121 { + t.Error("expected counter2 rate to be 121, instead was", + rawMetrics.Counters["counter2"]) + } + case <-time.After(20 * time.Millisecond): + t.Error("received no metrics from the MetricSystem after 2 milliseconds.") + } + + metricSystem.UnsubscribeFromRawMetrics(rawMetricStream) + metricSystem.Stop() +} + +func TestMetricSystemStop(t *testing.T) { + metricSystem := NewMetricSystem(time.Microsecond, false) + + startingRoutines := runtime.NumGoroutine() + + metricSystem.Start() + metricSystem.Stop() + + time.Sleep(20 * time.Millisecond) + + endRoutines := runtime.NumGoroutine() + if startingRoutines < endRoutines { + t.Errorf("lingering goroutines have not been cleaned up: "+ + "before: %d, after: %d\n", startingRoutines, endRoutines) + } +} diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/opentsdb.go b/Godeps/_workspace/src/github.com/spacejam/loghisto/opentsdb.go new file mode 100644 index 000000000..b84855fa7 --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/opentsdb.go @@ -0,0 +1,85 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Tyler Neely (t@jujit.su) + +package loghisto + +import ( + "bytes" + "fmt" + "os" + "strings" + +) + +type openTSDBStat struct { + Metric string + Time int64 + Value float64 + Tags map[string]string +} + +type openTSDBStatArray []*openTSDBStat + +func mapToTSDProtocolTags(tagMap map[string]string) string { + tags := make([]string, 0, len(tagMap)) + for tag, value := range tagMap { + tags = append(tags, fmt.Sprintf("%s=%s", tag, value)) + } + return strings.Join(tags, " ") +} + +func (stats openTSDBStatArray) ToRequest() []byte { + var request bytes.Buffer + for _, stat := range stats { + request.Write([]byte(fmt.Sprintf("put %s %d %f %s\n", + stat.Metric, + stat.Time, + stat.Value, + mapToTSDProtocolTags(stat.Tags)))) + } + return []byte(request.String()) +} + +func (metricSet *ProcessedMetricSet) toopenTSDBStats() openTSDBStatArray { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + + stats := make([]*openTSDBStat, 0, len(metricSet.Metrics)) + i := 0 + for metric, value := range metricSet.Metrics { + var tags = map[string]string{ + "host": hostname, + } + //TODO(tyler) custom tags + stats = append(stats, &openTSDBStat{ + Metric: metric, + Time: metricSet.Time.Unix(), + Value: value, + Tags: tags, + }) + i++ + } + return stats +} + +// OpenTSDBProtocol generates a wire representation of a ProcessedMetricSet +// for submission to an OpenTSDB instance. +func OpenTSDBProtocol(ms *ProcessedMetricSet) []byte { + return ms.toopenTSDBStats().ToRequest() +} diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/opentsdb_test.go b/Godeps/_workspace/src/github.com/spacejam/loghisto/opentsdb_test.go new file mode 100644 index 000000000..c5d6b1bd6 --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/opentsdb_test.go @@ -0,0 +1,23 @@ +package loghisto + +import ( + "testing" + "time" +) + +func TestOpenTSDB(t *testing.T) { + ms := NewMetricSystem(time.Second, true) + s := NewSubmitter(ms, OpenTSDBProtocol, "tcp", "localhost:7777") + s.Start() + + metrics := &ProcessedMetricSet{ + Time: time.Now(), + Metrics: map[string]float64{ + "test.1": 43.32, + "test.2": 12.3, + }, + } + request := s.serializer(metrics) + s.submit(request) + s.Shutdown() +} diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/print_benchmark.go b/Godeps/_workspace/src/github.com/spacejam/loghisto/print_benchmark.go new file mode 100644 index 000000000..4d147a877 --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/print_benchmark.go @@ -0,0 +1,106 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Tyler Neely (t@jujit.su) + +package loghisto + +import ( + "fmt" + "os" + "runtime" + "text/tabwriter" + "time" +) + +// PrintBenchmark will run the provided function at the specified +// concurrency, time the operation, and once per second write the +// following information to standard out: +// +// 2014-08-09 17:44:57 -0400 EDT +// raft_AppendLogEntries_count: 16488 +// raft_AppendLogEntries_max: 3.982478339757623e+07 +// raft_AppendLogEntries_99.99: 3.864778314316012e+07 +// raft_AppendLogEntries_99.9: 3.4366224772310276e+06 +// raft_AppendLogEntries_99: 2.0228126576114902e+06 +// raft_AppendLogEntries_50: 469769.7083161708 +// raft_AppendLogEntries_min: 129313.15075081984 +// raft_AppendLogEntries_sum: 9.975892639594093e+09 +// raft_AppendLogEntries_avg: 605039.5827022133 +// raft_AppendLogEntries_agg_avg: 618937 +// raft_AppendLogEntries_agg_count: 121095 +// raft_AppendLogEntries_agg_sum: 7.4950269894e+10 +// sys.Alloc: 997328 +// sys.NumGC: 1115 +// sys.PauseTotalNs: 2.94946542e+08 +// sys.NumGoroutine: 26 +func PrintBenchmark(name string, concurrency uint, op func()) { + runtime.GOMAXPROCS(runtime.NumCPU()) + var ms = NewMetricSystem(time.Second, true) + mc := make(chan *ProcessedMetricSet, 1) + ms.SubscribeToProcessedMetrics(mc) + ms.Start() + defer ms.Stop() + + go receiver(name, mc) + + for i := uint(0); i < concurrency; i++ { + go func() { + for { + timer := ms.StartTimer(name) + op() + timer.Stop() + } + }() + } + + <-make(chan struct{}) +} + +func receiver(name string, mc chan *ProcessedMetricSet) { + interesting := []string{ + fmt.Sprintf("%s_count", name), + fmt.Sprintf("%s_max", name), + fmt.Sprintf("%s_99.99", name), + fmt.Sprintf("%s_99.9", name), + fmt.Sprintf("%s_99", name), + fmt.Sprintf("%s_95", name), + fmt.Sprintf("%s_90", name), + fmt.Sprintf("%s_75", name), + fmt.Sprintf("%s_50", name), + fmt.Sprintf("%s_min", name), + fmt.Sprintf("%s_sum", name), + fmt.Sprintf("%s_avg", name), + fmt.Sprintf("%s_agg_avg", name), + fmt.Sprintf("%s_agg_count", name), + fmt.Sprintf("%s_agg_sum", name), + "sys.Alloc", + "sys.NumGC", + "sys.PauseTotalNs", + "sys.NumGoroutine", + } + + w := new(tabwriter.Writer) + w.Init(os.Stdout, 0, 8, 0, '\t', 0) + + for m := range mc { + fmt.Fprintln(w, m.Time) + for _, e := range interesting { + fmt.Fprintln(w, fmt.Sprintf("%s:\t", e), m.Metrics[e]) + } + fmt.Fprintln(w) + w.Flush() + } +} diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/readme.md b/Godeps/_workspace/src/github.com/spacejam/loghisto/readme.md new file mode 100644 index 000000000..67a61b61b --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/readme.md @@ -0,0 +1,113 @@ +loghisto +============ +[![Build Status](https://travis-ci.org/spacejam/loghisto.svg)](https://travis-ci.org/spacejam/loghisto) + +A metric system for high performance counters and histograms. Unlike popular metric systems today, this does not destroy the accuracy of histograms by sampling. Instead, a logarithmic bucketing function compresses values, generally within 1% of their true value (although between 0 and 1 the precision loss may not be within this boundary). This allows for extreme compression, which allows us to calculate arbitrarily high percentiles with no loss of accuracy - just a small amount of precision. This is particularly useful for highly-clustered events that are tolerant of a small precision loss, but for which you REALLY care about what the tail looks like, such as measuring latency across a distributed system. + +Copied out of my work for the CockroachDB metrics system. Based on an algorithm created by Keith Frost. + + +### running a print benchmark for quick analysis +```go +package main + +import ( + "runtime" + "github.com/spacejam/loghisto" +) + +func benchmark() { + // do some stuff +} + +func main() { + numCPU := runtime.NumCPU() + runtime.GOMAXPROCS(numCPU) + + desiredConcurrency := uint(100) + loghisto.PrintBenchmark("benchmark1234", desiredConcurrency, benchmark) +} +``` +results in something like this printed to stdout each second: +``` +2014-12-11 21:41:45 -0500 EST +benchmark1234_count: 2.0171025e+07 +benchmark1234_max: 2.4642914167480484e+07 +benchmark1234_99.99: 4913.768840299134 +benchmark1234_99.9: 1001.2472422902518 +benchmark1234_99: 71.24044000732538 +benchmark1234_95: 67.03348428941965 +benchmark1234_90: 65.68633104092515 +benchmark1234_75: 63.07152259993664 +benchmark1234_50: 58.739891704145194 +benchmark1234_min: -657.5233632152207 // Corollary: time.Since(time.Now()) is often < 0 +benchmark1234_sum: 1.648051169322668e+09 +benchmark1234_avg: 81.70388809307748 +benchmark1234_agg_avg: 89 +benchmark1234_agg_count: 6.0962226e+07 +benchmark1234_agg_sum: 5.454779078e+09 +sys.Alloc: 1.132672e+06 +sys.NumGC: 5741 +sys.PauseTotalNs: 1.569390954e+09 +sys.NumGoroutine: 113 +``` +### adding an embedded metric system to your code +```go +import ( + "time" + "fmt" + "github.com/spacejam/loghisto" +) +func ExampleMetricSystem() { + // Create metric system that reports once a minute, and includes stats + // about goroutines, memory usage and GC. + includeGoProcessStats := true + ms := loghisto.NewMetricSystem(time.Minute, includeGoProcessStats) + ms.Start() + + // create a channel that subscribes to metrics as they are produced once + // per minute. + // NOTE: if you allow this channel to fill up, the metric system will NOT + // block, and will FORGET about your channel if you fail to unblock the + // channel after 3 configured intervals (in this case 3 minutes) rather + // than causing a memory leak. + myMetricStream := make(chan *loghisto.ProcessedMetricSet, 2) + ms.SubscribeToProcessedMetrics(myMetricStream) + + // create some metrics + timeToken := ms.StartTimer("time for creating a counter and histo") + ms.Counter("some event", 1) + ms.Histogram("some measured thing", 123) + timeToken.Stop() + + for m := range myMetricStream { + fmt.Printf("number of goroutines: %f\n", m.Metrics["sys.NumGoroutine"]) + } + + // if you want to manually unsubscribe from the metric stream + ms.UnsubscribeFromProcessedMetrics(myMetricStream) + + // to stop and clean up your metric system + ms.Stop() +} +``` +### automatically sending your metrics to OpenTSDB, KairosDB or Graphite +```go +func ExampleExternalSubmitter() { + includeGoProcessStats := true + ms := NewMetricSystem(time.Minute, includeGoProcessStats) + ms.Start() + // graphite + s := NewSubmitter(ms, GraphiteProtocol, "tcp", "localhost:7777") + s.Start() + + // opentsdb / kairosdb + s := NewSubmitter(ms, OpenTSDBProtocol, "tcp", "localhost:7777") + s.Start() + + // to tear down: + s.Shutdown() +} +``` + +See code for the Graphite/OpenTSDB protocols for adding your own output plugins, it's pretty simple. diff --git a/Godeps/_workspace/src/github.com/spacejam/loghisto/submitter.go b/Godeps/_workspace/src/github.com/spacejam/loghisto/submitter.go new file mode 100644 index 000000000..d4835c0c5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/spacejam/loghisto/submitter.go @@ -0,0 +1,159 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Tyler Neely (t@jujit.su) + +package loghisto + +import ( + "net" + "sync" + "time" + +) + +type requestable interface{} + +type requestableArray interface { + ToRequest() []byte +} + +// Submitter encapsulates the state of a metric submitter. +type Submitter struct { + // backlog works as an evicting queue + backlog [60][]byte + backlogHead uint + backlogTail uint + backlogMu sync.Mutex + serializer func(*ProcessedMetricSet) []byte + DestinationNetwork string + DestinationAddress string + metricSystem *MetricSystem + metricChan chan *ProcessedMetricSet + shutdownChan chan struct{} +} + +// NewSubmitter creates a Submitter that receives metrics off of a +// specified metric channel, serializes them using the provided +// serialization function, and attempts to send them to the +// specified destination. +func NewSubmitter(metricSystem *MetricSystem, + serializer func(*ProcessedMetricSet) []byte, destinationNetwork string, + destinationAddress string) *Submitter { + metricChan := make(chan *ProcessedMetricSet, 60) + metricSystem.SubscribeToProcessedMetrics(metricChan) + return &Submitter{ + backlog: [60][]byte{}, + backlogHead: 0, + backlogTail: 0, + serializer: serializer, + DestinationNetwork: destinationNetwork, + DestinationAddress: destinationAddress, + metricSystem: metricSystem, + metricChan: metricChan, + shutdownChan: make(chan struct{}), + } +} + +func (s *Submitter) retryBacklog() error { + var request []byte + for { + s.backlogMu.Lock() + head := s.backlogHead + tail := s.backlogTail + if head != tail { + request = s.backlog[head] + } + s.backlogMu.Unlock() + + if head == tail { + return nil + } + + err := s.submit(request) + if err != nil { + return err + } + s.backlogMu.Lock() + s.backlogHead = (s.backlogHead + 1) % 60 + s.backlogMu.Unlock() + } +} + +func (s *Submitter) appendToBacklog(request []byte) { + s.backlogMu.Lock() + s.backlog[s.backlogTail] = request + s.backlogTail = (s.backlogTail + 1) % 60 + // if we've run into the head, evict it + if s.backlogHead == s.backlogTail { + s.backlogHead = (s.backlogHead + 1) % 60 + } + s.backlogMu.Unlock() +} + +func (s *Submitter) submit(request []byte) error { + conn, err := net.DialTimeout(s.DestinationNetwork, s.DestinationAddress, + 5*time.Second) + if err != nil { + return err + } + conn.SetDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Write(request) + conn.Close() + return err +} + +// Start creates the goroutines that receive, serialize, and send metrics. +func (s *Submitter) Start() { + go func() { + for { + select { + case metrics, ok := <-s.metricChan: + if !ok { + // We can no longer make progress. + return + } + request := s.serializer(metrics) + s.appendToBacklog(request) + case <-s.shutdownChan: + return + } + } + }() + + go func() { + for { + select { + case <-s.shutdownChan: + return + default: + s.retryBacklog() + tts := s.metricSystem.interval.Nanoseconds() - + (time.Now().UnixNano() % s.metricSystem.interval.Nanoseconds()) + time.Sleep(time.Duration(tts)) + } + } + }() +} + +// Shutdown shuts down a submitter +func (s *Submitter) Shutdown() { + select { + case <-s.shutdownChan: + // already closed + default: + close(s.shutdownChan) + } +}