// Copyright 2015 CoreOS, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rafthttp import ( "encoding/binary" "fmt" "io" "time" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" ) const ( msgTypeLinkHeartbeat uint8 = 0 msgTypeAppEntries uint8 = 1 msgTypeApp uint8 = 2 msgAppV2BufSize = 1024 * 1024 ) // msgappv2 stream sends three types of message: linkHeartbeatMessage, // AppEntries and MsgApp. AppEntries is the MsgApp that is sent in // replicate state in raft, whose index and term are fully predictable. // // Data format of linkHeartbeatMessage: // | offset | bytes | description | // +--------+-------+-------------+ // | 0 | 1 | \x00 | // // Data format of AppEntries: // | offset | bytes | description | // +--------+-------+-------------+ // | 0 | 1 | \x01 | // | 1 | 8 | length of entries | // | 9 | 8 | length of first entry | // | 17 | n1 | first entry | // ... // | x | 8 | length of k-th entry data | // | x+8 | nk | k-th entry data | // | x+8+nk | 8 | commit index | // // Data format of MsgApp: // | offset | bytes | description | // +--------+-------+-------------+ // | 0 | 1 | \x02 | // | 1 | 8 | length of encoded message | // | 9 | n | encoded message | type msgAppV2Encoder struct { w io.Writer fs *stats.FollowerStats term uint64 index uint64 buf []byte uint64buf []byte uint8buf []byte } func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder { return &msgAppV2Encoder{ w: w, fs: fs, buf: make([]byte, msgAppV2BufSize), uint64buf: make([]byte, 8), uint8buf: make([]byte, 1), } } func (enc *msgAppV2Encoder) encode(m raftpb.Message) error { start := time.Now() switch { case isLinkHeartbeatMessage(m): enc.uint8buf[0] = byte(msgTypeLinkHeartbeat) if _, err := enc.w.Write(enc.uint8buf); err != nil { return err } case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term: enc.uint8buf[0] = byte(msgTypeAppEntries) if _, err := enc.w.Write(enc.uint8buf); err != nil { return err } // write length of entries binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries))) if _, err := enc.w.Write(enc.uint64buf); err != nil { return err } for i := 0; i < len(m.Entries); i++ { // write length of entry binary.BigEndian.PutUint64(enc.uint64buf, uint64(m.Entries[i].Size())) if _, err := enc.w.Write(enc.uint64buf); err != nil { return err } if n := m.Entries[i].Size(); n < msgAppV2BufSize { if _, err := m.Entries[i].MarshalTo(enc.buf); err != nil { return err } if _, err := enc.w.Write(enc.buf[:n]); err != nil { return err } } else { if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil { return err } } enc.index++ } // write commit index binary.BigEndian.PutUint64(enc.uint64buf, m.Commit) if _, err := enc.w.Write(enc.uint64buf); err != nil { return err } enc.fs.Succ(time.Since(start)) default: if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil { return err } // write size of message if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil { return err } // write message if _, err := enc.w.Write(pbutil.MustMarshal(&m)); err != nil { return err } enc.term = m.Term enc.index = m.Index if l := len(m.Entries); l > 0 { enc.index = m.Entries[l-1].Index } enc.fs.Succ(time.Since(start)) } return nil } type msgAppV2Decoder struct { r io.Reader local, remote types.ID term uint64 index uint64 buf []byte uint64buf []byte uint8buf []byte } func newMsgAppV2Decoder(r io.Reader, local, remote types.ID) *msgAppV2Decoder { return &msgAppV2Decoder{ r: r, local: local, remote: remote, buf: make([]byte, msgAppV2BufSize), uint64buf: make([]byte, 8), uint8buf: make([]byte, 1), } } func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) { var ( m raftpb.Message typ uint8 ) if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil { return m, err } typ = uint8(dec.uint8buf[0]) switch typ { case msgTypeLinkHeartbeat: return linkHeartbeatMessage, nil case msgTypeAppEntries: m = raftpb.Message{ Type: raftpb.MsgApp, From: uint64(dec.remote), To: uint64(dec.local), Term: dec.term, LogTerm: dec.term, Index: dec.index, } // decode entries if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil { return m, err } l := binary.BigEndian.Uint64(dec.uint64buf) m.Entries = make([]raftpb.Entry, int(l)) for i := 0; i < int(l); i++ { if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil { return m, err } size := binary.BigEndian.Uint64(dec.uint64buf) var buf []byte if size < msgAppV2BufSize { buf = dec.buf[:size] if _, err := io.ReadFull(dec.r, buf); err != nil { return m, err } } else { buf = make([]byte, int(size)) if _, err := io.ReadFull(dec.r, buf); err != nil { return m, err } } dec.index++ // 1 alloc pbutil.MustUnmarshal(&m.Entries[i], buf) } // decode commit index if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil { return m, err } m.Commit = binary.BigEndian.Uint64(dec.uint64buf) case msgTypeApp: var size uint64 if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil { return m, err } buf := make([]byte, int(size)) if _, err := io.ReadFull(dec.r, buf); err != nil { return m, err } pbutil.MustUnmarshal(&m, buf) dec.term = m.Term dec.index = m.Index if l := len(m.Entries); l > 0 { dec.index = m.Entries[l-1].Index } default: return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ) } return m, nil }