optimize diff bunches
parent
0a0a079d93
commit
64b2689ab8
|
@ -175,7 +175,7 @@ func TestWriteDiffBunch(t *testing.T) {
|
|||
|
||||
for w := 0; w < 5; w++ {
|
||||
for n := 0; n < 200; n++ {
|
||||
cache.addToCache(int64(n), int64(w))
|
||||
cache.cache.add(1, int64(n), int64(w))
|
||||
}
|
||||
}
|
||||
cache.Close()
|
||||
|
@ -191,3 +191,46 @@ func TestWriteDiffBunch(t *testing.T) {
|
|||
cache.Close()
|
||||
|
||||
}
|
||||
|
||||
func TestIdRefBunches(t *testing.T) {
|
||||
bunches := make(IdRefBunches)
|
||||
bunches.add(1, 100, 999)
|
||||
|
||||
if r := bunches[1].idRefs[0]; r.id != 100 || r.refs[0] != 999 {
|
||||
t.Fatal(bunches)
|
||||
}
|
||||
|
||||
// same id
|
||||
bunches.add(1, 100, 998)
|
||||
if r := bunches[1].idRefs[0]; r.id != 100 || r.refs[0] != 998 || r.refs[1] != 999 {
|
||||
t.Fatal(bunches)
|
||||
}
|
||||
|
||||
// before
|
||||
bunches.add(1, 99, 888)
|
||||
if r := bunches[1].idRefs[0]; r.id != 99 || r.refs[0] != 888 {
|
||||
t.Fatal(bunches)
|
||||
}
|
||||
|
||||
// after
|
||||
bunches.add(1, 102, 777)
|
||||
if r := bunches[1].idRefs[2]; r.id != 102 || r.refs[0] != 777 {
|
||||
t.Fatal(bunches)
|
||||
}
|
||||
|
||||
// in between
|
||||
bunches.add(1, 101, 666)
|
||||
if r := bunches[1].idRefs[2]; r.id != 101 || r.refs[0] != 666 {
|
||||
t.Fatal(bunches)
|
||||
}
|
||||
|
||||
if len(bunches) != 1 {
|
||||
t.Fatal(bunches)
|
||||
}
|
||||
if bunches[1].id != 1 {
|
||||
t.Fatal(bunches)
|
||||
}
|
||||
if len(bunches[1].idRefs) != 4 {
|
||||
t.Fatal(bunches)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,33 +7,37 @@ import (
|
|||
"goposm/element"
|
||||
"log"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type bunchCache map[int64]RefBunch
|
||||
|
||||
type BunchRefCache struct {
|
||||
Cache
|
||||
cache bunchCache
|
||||
write chan bunchCache
|
||||
cache IdRefBunches
|
||||
write chan IdRefBunches
|
||||
add chan idRef
|
||||
mu sync.Mutex
|
||||
waitAdd *sync.WaitGroup
|
||||
waitWrite *sync.WaitGroup
|
||||
}
|
||||
|
||||
var IdRefBunchesCache chan IdRefBunches
|
||||
|
||||
func init() {
|
||||
IdRefBunchesCache = make(chan IdRefBunches, 1)
|
||||
}
|
||||
|
||||
type IdRef struct {
|
||||
id int64
|
||||
refs []int64
|
||||
}
|
||||
|
||||
var bunchCaches chan bunchCache
|
||||
|
||||
func init() {
|
||||
bunchCaches = make(chan bunchCache, 1)
|
||||
type IdRefBunch struct {
|
||||
id int64
|
||||
idRefs []IdRef
|
||||
}
|
||||
|
||||
type RefBunch map[int64][]int64
|
||||
type IdRefBunches map[int64]IdRefBunch
|
||||
|
||||
func NewBunchRefCache(path string, opts *CacheOptions) (*BunchRefCache, error) {
|
||||
index := BunchRefCache{}
|
||||
|
@ -42,8 +46,8 @@ func NewBunchRefCache(path string, opts *CacheOptions) (*BunchRefCache, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
index.write = make(chan bunchCache, 2)
|
||||
index.cache = make(bunchCache, cacheSize)
|
||||
index.write = make(chan IdRefBunches, 2)
|
||||
index.cache = make(IdRefBunches, cacheSize)
|
||||
index.add = make(chan idRef, 1024)
|
||||
|
||||
index.waitWrite = &sync.WaitGroup{}
|
||||
|
@ -75,13 +79,13 @@ func (index *BunchRefCache) Close() {
|
|||
|
||||
func (index *BunchRefCache) dispatch() {
|
||||
for idRef := range index.add {
|
||||
index.addToCache(idRef.id, idRef.ref)
|
||||
index.cache.add(index.getBunchId(idRef.id), idRef.id, idRef.ref)
|
||||
if len(index.cache) >= cacheSize {
|
||||
index.write <- index.cache
|
||||
select {
|
||||
case index.cache = <-bunchCaches:
|
||||
case index.cache = <-IdRefBunchesCache:
|
||||
default:
|
||||
index.cache = make(bunchCache, cacheSize)
|
||||
index.cache = make(IdRefBunches, cacheSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -102,26 +106,37 @@ func (index *BunchRefCache) getBunchId(id int64) int64 {
|
|||
return id / 64
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) addToCache(id, ref int64) {
|
||||
bunchId := index.getBunchId(id)
|
||||
|
||||
bunch, ok := index.cache[bunchId]
|
||||
func (bunches *IdRefBunches) add(bunchId, id, ref int64) {
|
||||
bunch, ok := (*bunches)[bunchId]
|
||||
if !ok {
|
||||
bunch = RefBunch{}
|
||||
bunch = IdRefBunch{id: bunchId}
|
||||
}
|
||||
var idRef *IdRef
|
||||
|
||||
i := sort.Search(len(bunch.idRefs), func(i int) bool {
|
||||
return bunch.idRefs[i].id >= id
|
||||
})
|
||||
if i < len(bunch.idRefs) && bunch.idRefs[i].id >= id {
|
||||
if bunch.idRefs[i].id == id {
|
||||
idRef = &bunch.idRefs[i]
|
||||
} else {
|
||||
bunch.idRefs = append(bunch.idRefs, IdRef{})
|
||||
copy(bunch.idRefs[i+1:], bunch.idRefs[i:])
|
||||
bunch.idRefs[i] = IdRef{id: id}
|
||||
idRef = &bunch.idRefs[i]
|
||||
}
|
||||
} else {
|
||||
bunch.idRefs = append(bunch.idRefs, IdRef{id: id})
|
||||
idRef = &bunch.idRefs[len(bunch.idRefs)-1]
|
||||
}
|
||||
|
||||
refs, ok := bunch[id]
|
||||
if !ok {
|
||||
refs = make([]int64, 0, 1)
|
||||
}
|
||||
refs = insertRefs(refs, ref)
|
||||
bunch[id] = refs
|
||||
index.cache[bunchId] = bunch
|
||||
idRef.refs = insertRefs(idRef.refs, ref)
|
||||
(*bunches)[bunchId] = bunch
|
||||
}
|
||||
|
||||
type loadBunchItem struct {
|
||||
bunchId int64
|
||||
bunch RefBunch
|
||||
bunch IdRefBunch
|
||||
}
|
||||
|
||||
type writeBunchItem struct {
|
||||
|
@ -129,7 +144,7 @@ type writeBunchItem struct {
|
|||
data []byte
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) writeRefs(idRefs bunchCache) error {
|
||||
func (index *BunchRefCache) writeRefs(idRefs IdRefBunches) error {
|
||||
batch := levigo.NewWriteBatch()
|
||||
defer batch.Close()
|
||||
|
||||
|
@ -142,14 +157,9 @@ func (index *BunchRefCache) writeRefs(idRefs bunchCache) error {
|
|||
go func() {
|
||||
for item := range loadc {
|
||||
keyBuf := idToKeyBuf(item.bunchId)
|
||||
bunchList := make([]IdRef, len(item.bunch))
|
||||
for id, refs := range item.bunch {
|
||||
bunchList = append(bunchList, IdRef{id, refs})
|
||||
}
|
||||
// TODO
|
||||
putc <- writeBunchItem{
|
||||
keyBuf,
|
||||
index.loadMergeMarshal(keyBuf, bunchList),
|
||||
index.loadMergeMarshal(keyBuf, item.bunch.idRefs),
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
|
@ -174,7 +184,7 @@ func (index *BunchRefCache) writeRefs(idRefs bunchCache) error {
|
|||
delete(idRefs, k)
|
||||
}
|
||||
select {
|
||||
case bunchCaches <- idRefs:
|
||||
case IdRefBunchesCache <- idRefs:
|
||||
}
|
||||
}()
|
||||
return index.db.Write(index.wo, batch)
|
||||
|
|
Loading…
Reference in New Issue