load refs in parallel
parent
3d7a38c817
commit
4d85c03f92
|
@ -9,6 +9,7 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
@ -206,15 +207,50 @@ func (index *RefIndex) addToCache(id, ref int64) {
|
|||
index.cache[id] = refs
|
||||
}
|
||||
|
||||
type writeRefItem struct {
|
||||
key []byte
|
||||
data []byte
|
||||
}
|
||||
type loadRefItem struct {
|
||||
id int64
|
||||
refs []int64
|
||||
}
|
||||
|
||||
func (index *RefIndex) writeRefs(idRefs map[int64][]int64) error {
|
||||
batch := levigo.NewWriteBatch()
|
||||
defer batch.Close()
|
||||
|
||||
for id, refs := range idRefs {
|
||||
keyBuf := idToKeyBuf(id)
|
||||
data := index.loadAppendMarshal(id, refs)
|
||||
batch.Put(keyBuf, data)
|
||||
wg := sync.WaitGroup{}
|
||||
putc := make(chan writeRefItem)
|
||||
loadc := make(chan loadRefItem)
|
||||
|
||||
for i := 0; i < runtime.NumCPU(); i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for item := range loadc {
|
||||
keyBuf := idToKeyBuf(item.id)
|
||||
putc <- writeRefItem{
|
||||
keyBuf,
|
||||
index.loadAppendMarshal(keyBuf, item.refs),
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
for id, refs := range idRefs {
|
||||
loadc <- loadRefItem{id, refs}
|
||||
}
|
||||
close(loadc)
|
||||
wg.Wait()
|
||||
close(putc)
|
||||
}()
|
||||
|
||||
for item := range putc {
|
||||
batch.Put(item.key, item.data)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for k, _ := range idRefs {
|
||||
delete(idRefs, k)
|
||||
|
@ -226,8 +262,7 @@ func (index *RefIndex) writeRefs(idRefs map[int64][]int64) error {
|
|||
return index.db.Write(index.wo, batch)
|
||||
|
||||
}
|
||||
func (index *RefIndex) loadAppendMarshal(id int64, newRefs []int64) []byte {
|
||||
keyBuf := idToKeyBuf(id)
|
||||
func (index *RefIndex) loadAppendMarshal(keyBuf []byte, newRefs []int64) []byte {
|
||||
data, err := index.db.Get(index.ro, keyBuf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
Loading…
Reference in New Issue