pool idRefs/bytes in diff cache

master
Oliver Tonnhofer 2013-11-19 17:27:43 +01:00
parent c3a7be1ab2
commit a4ea44acb1
3 changed files with 171 additions and 14 deletions

98
cache/binary/diff.go vendored
View File

@ -46,6 +46,52 @@ func MarshalIdRefsBunch(idRefs []element.IdRefs) []byte {
return buf[:nextPos]
}
func MarshalIdRefsBunch2(idRefs []element.IdRefs, buf []byte) []byte {
lastRef := int64(0)
lastId := int64(0)
nextPos := 0
estSize := len(idRefs)*(4+1+6) + binary.MaxVarintLen64
if cap(buf) < estSize {
buf = make([]byte, estSize)
} else {
// expand to full capacity
buf = buf[:cap(buf)]
}
nextPos += binary.PutUvarint(buf[nextPos:], uint64(len(idRefs)))
for _, idRef := range idRefs {
if len(buf)-nextPos < binary.MaxVarintLen64 {
tmp := make([]byte, cap(buf)*2)
copy(tmp, buf[:nextPos])
buf = tmp
}
nextPos += binary.PutVarint(buf[nextPos:], idRef.Id-lastId)
lastId = idRef.Id
}
for _, idRef := range idRefs {
if len(buf)-nextPos < binary.MaxVarintLen64 {
tmp := make([]byte, cap(buf)*2)
copy(tmp, buf[:nextPos])
buf = tmp
}
nextPos += binary.PutUvarint(buf[nextPos:], uint64(len(idRef.Refs)))
}
for _, idRef := range idRefs {
for _, ref := range idRef.Refs {
if len(buf)-nextPos < binary.MaxVarintLen64 {
tmp := make([]byte, cap(buf)*2)
copy(tmp, buf[:nextPos])
buf = tmp
}
nextPos += binary.PutVarint(buf[nextPos:], ref-lastRef)
lastRef = ref
}
}
return buf[:nextPos]
}
func UnmarshalIdRefsBunch(buf []byte) []element.IdRefs {
length, n := binary.Uvarint(buf)
if n <= 0 {
@ -89,3 +135,55 @@ func UnmarshalIdRefsBunch(buf []byte) []element.IdRefs {
}
return idRefs
}
func UnmarshalIdRefsBunch2(buf []byte, idRefs []element.IdRefs) []element.IdRefs {
length, n := binary.Uvarint(buf)
if n <= 0 {
return nil
}
offset := n
if uint64(cap(idRefs)) < length {
idRefs = make([]element.IdRefs, length)
} else {
idRefs = idRefs[:length]
}
last := int64(0)
for i := 0; uint64(i) < length; i++ {
idRefs[i].Id, n = binary.Varint(buf[offset:])
if n <= 0 {
panic("no data")
}
offset += n
idRefs[i].Id += last
last = idRefs[i].Id
}
var numRefs uint64
for i := 0; uint64(i) < length; i++ {
numRefs, n = binary.Uvarint(buf[offset:])
if n <= 0 {
panic("no data")
}
offset += n
if uint64(cap(idRefs[i].Refs)) < numRefs {
idRefs[i].Refs = make([]int64, numRefs)
} else {
idRefs[i].Refs = idRefs[i].Refs[:numRefs]
}
}
last = 0
for idIdx := 0; uint64(idIdx) < length; idIdx++ {
for refIdx := 0; refIdx < len(idRefs[idIdx].Refs); refIdx++ {
idRefs[idIdx].Refs[refIdx], n = binary.Varint(buf[offset:])
if n <= 0 {
panic("no data")
}
offset += n
idRefs[idIdx].Refs[refIdx] += last
last = idRefs[idIdx].Refs[refIdx]
}
}
return idRefs
}

View File

@ -16,8 +16,8 @@ func TestmarshalBunch(t *testing.T) {
{123924132, []int64{912412210, 9124213, 212412210}},
}
buf := MarshalIdRefsBunch(bunch)
newBunch := UnmarshalIdRefsBunch(buf)
buf := MarshalIdRefsBunch2(bunch, nil)
newBunch := UnmarshalIdRefsBunch2(buf, nil)
t.Log(len(buf), float64(len(buf))/6.0)
@ -47,9 +47,10 @@ func BenchmarkMarshalBunch(b *testing.B) {
{123924130, []int64{91241213}},
{123924132, []int64{912412210, 9124213, 212412210}},
}
idRefs := []element.IdRefs{}
buf := []byte{}
for i := 0; i < b.N; i++ {
buf := MarshalIdRefsBunch(bunch)
UnmarshalIdRefsBunch(buf)
buf = MarshalIdRefsBunch2(bunch, buf)
idRefs = UnmarshalIdRefsBunch2(buf, idRefs)
}
}

76
cache/diff.go vendored
View File

@ -251,7 +251,9 @@ func (index *bunchRefCache) Get(id int64) []int64 {
}
if data != nil {
for _, idRef := range binary.UnmarshalIdRefsBunch(data) {
idRefs := idRefsPool.get()
defer idRefsPool.release(idRefs)
for _, idRef := range binary.UnmarshalIdRefsBunch2(data, idRefs) {
if idRef.Id == id {
return idRef.Refs
}
@ -270,14 +272,18 @@ func (index *bunchRefCache) Add(id, ref int64) error {
var idRefs []element.IdRefs
if data != nil {
idRefs = binary.UnmarshalIdRefsBunch(data)
idRefs = idRefsPool.get()
defer idRefsPool.release(idRefs)
idRefs = binary.UnmarshalIdRefsBunch2(data, idRefs)
}
idRefBunch := idRefBunch{index.getBunchId(id), idRefs}
idRef := idRefBunch.getCreate(id)
idRef.Add(ref)
data = binary.MarshalIdRefsBunch(idRefBunch.idRefs)
data = bytePool.get()
defer bytePool.release(data)
data = binary.MarshalIdRefsBunch2(idRefBunch.idRefs, data)
return index.db.Put(index.wo, keyBuf, data)
}
@ -295,12 +301,16 @@ func (index *bunchRefCache) DeleteRef(id, ref int64) error {
}
if data != nil {
idRefs := binary.UnmarshalIdRefsBunch(data)
idRefs := idRefsPool.get()
defer idRefsPool.release(idRefs)
idRefs = binary.UnmarshalIdRefsBunch2(data, idRefs)
idRefBunch := idRefBunch{index.getBunchId(id), idRefs}
idRef := idRefBunch.get(id)
if idRef != nil {
idRef.Delete(ref)
data := binary.MarshalIdRefsBunch(idRefs)
data := bytePool.get()
defer bytePool.release(data)
data = binary.MarshalIdRefsBunch2(idRefs, data)
return index.db.Put(index.wo, keyBuf, data)
}
}
@ -320,12 +330,16 @@ func (index *bunchRefCache) Delete(id int64) error {
}
if data != nil {
idRefs := binary.UnmarshalIdRefsBunch(data)
idRefs := idRefsPool.get()
defer idRefsPool.release(idRefs)
idRefs = binary.UnmarshalIdRefsBunch2(data, idRefs)
idRefBunch := idRefBunch{index.getBunchId(id), idRefs}
idRef := idRefBunch.get(id)
if idRef != nil {
idRef.Refs = []int64{}
data := binary.MarshalIdRefsBunch(idRefs)
data := bytePool.get()
defer bytePool.release(data)
data = binary.MarshalIdRefsBunch2(idRefs, data)
return index.db.Put(index.wo, keyBuf, data)
}
}
@ -459,6 +473,7 @@ func (index *bunchRefCache) writeRefs(idRefs idRefBunches) error {
for item := range putc {
batch.Put(item.bunchIdBuf, item.data)
bytePool.release(item.data)
}
go func() {
@ -524,7 +539,9 @@ func (index *bunchRefCache) loadMergeMarshal(keyBuf []byte, newBunch []element.I
var bunch []element.IdRefs
if data != nil {
bunch = binary.UnmarshalIdRefsBunch(data)
bunch = idRefsPool.get()
defer idRefsPool.release(bunch)
bunch = binary.UnmarshalIdRefsBunch2(data, bunch)
}
if bunch == nil {
@ -533,6 +550,47 @@ func (index *bunchRefCache) loadMergeMarshal(keyBuf []byte, newBunch []element.I
bunch = mergeBunch(bunch, newBunch)
}
data = binary.MarshalIdRefsBunch(bunch)
data = bytePool.get()
data = binary.MarshalIdRefsBunch2(bunch, data)
return data
}
// pools to reuse memory
var idRefsPool = make(idRefsPoolWrapper, 8)
var bytePool = make(bytePoolWrapper, 8)
type bytePoolWrapper chan []byte
func (p *bytePoolWrapper) get() []byte {
select {
case buf := <-(*p):
return buf
default:
return nil
}
}
func (p *bytePoolWrapper) release(buf []byte) {
select {
case (*p) <- buf:
default:
}
}
type idRefsPoolWrapper chan []element.IdRefs
func (p *idRefsPoolWrapper) get() []element.IdRefs {
select {
case idRefs := <-(*p):
return idRefs
default:
return nil
}
}
func (p *idRefsPoolWrapper) release(idRefs []element.IdRefs) {
select {
case (*p) <- idRefs:
default:
}
}