refactored diff cache
parent
0f09649775
commit
d092ba7807
|
@ -5,7 +5,6 @@ import (
|
|||
"encoding/binary"
|
||||
"github.com/jmhodges/levigo"
|
||||
"goposm/element"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -27,6 +26,11 @@ type DiffCache struct {
|
|||
opened bool
|
||||
}
|
||||
|
||||
func NewDiffCache(dir string) *DiffCache {
|
||||
cache := &DiffCache{Dir: dir}
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *DiffCache) Close() {
|
||||
if c.Coords != nil {
|
||||
c.Coords.Close()
|
||||
|
@ -38,19 +42,14 @@ func (c *DiffCache) Close() {
|
|||
}
|
||||
}
|
||||
|
||||
func NewDiffCache(dir string) *DiffCache {
|
||||
cache := &DiffCache{Dir: dir}
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *DiffCache) Open() error {
|
||||
var err error
|
||||
c.Coords, err = NewCoordsRefIndex(filepath.Join(c.Dir, "coords_index"))
|
||||
c.Coords, err = newCoordsRefIndex(filepath.Join(c.Dir, "coords_index"))
|
||||
if err != nil {
|
||||
c.Close()
|
||||
return err
|
||||
}
|
||||
c.Ways, err = NewWaysRefIndex(filepath.Join(c.Dir, "ways_index"))
|
||||
c.Ways, err = newWaysRefIndex(filepath.Join(c.Dir, "ways_index"))
|
||||
if err != nil {
|
||||
c.Close()
|
||||
return err
|
||||
|
@ -85,45 +84,95 @@ func (c *DiffCache) Remove() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type RefIndex struct {
|
||||
type idRef struct {
|
||||
id int64
|
||||
ref int64
|
||||
}
|
||||
|
||||
const bufferSize = 64 * 1024
|
||||
|
||||
func insertRefs(refs []int64, ref int64) []int64 {
|
||||
i := sort.Search(len(refs), func(i int) bool {
|
||||
return refs[i] >= ref
|
||||
})
|
||||
if i < len(refs) && refs[i] >= ref {
|
||||
refs = append(refs, 0)
|
||||
copy(refs[i+1:], refs[i:])
|
||||
refs[i] = ref
|
||||
} else {
|
||||
refs = append(refs, ref)
|
||||
}
|
||||
return refs
|
||||
}
|
||||
|
||||
type IdRef struct {
|
||||
id int64
|
||||
refs []int64
|
||||
}
|
||||
|
||||
// IdRefBunch stores multiple IdRefs
|
||||
type IdRefBunch struct {
|
||||
id int64 // the bunch id
|
||||
idRefs []IdRef
|
||||
}
|
||||
|
||||
// IdRefBunches can hold multiple IdRefBunch
|
||||
type IdRefBunches map[int64]IdRefBunch
|
||||
|
||||
func (bunches *IdRefBunches) add(bunchId, id, ref int64) {
|
||||
bunch, ok := (*bunches)[bunchId]
|
||||
if !ok {
|
||||
bunch = IdRefBunch{id: bunchId}
|
||||
}
|
||||
var idRef *IdRef
|
||||
|
||||
i := sort.Search(len(bunch.idRefs), func(i int) bool {
|
||||
return bunch.idRefs[i].id >= id
|
||||
})
|
||||
if i < len(bunch.idRefs) && bunch.idRefs[i].id >= id {
|
||||
if bunch.idRefs[i].id == id {
|
||||
idRef = &bunch.idRefs[i]
|
||||
} else {
|
||||
bunch.idRefs = append(bunch.idRefs, IdRef{})
|
||||
copy(bunch.idRefs[i+1:], bunch.idRefs[i:])
|
||||
bunch.idRefs[i] = IdRef{id: id}
|
||||
idRef = &bunch.idRefs[i]
|
||||
}
|
||||
} else {
|
||||
bunch.idRefs = append(bunch.idRefs, IdRef{id: id})
|
||||
idRef = &bunch.idRefs[len(bunch.idRefs)-1]
|
||||
}
|
||||
|
||||
idRef.refs = insertRefs(idRef.refs, ref)
|
||||
(*bunches)[bunchId] = bunch
|
||||
}
|
||||
|
||||
var IdRefBunchesPool chan IdRefBunches
|
||||
|
||||
func init() {
|
||||
IdRefBunchesPool = make(chan IdRefBunches, 1)
|
||||
}
|
||||
|
||||
// BunchRefCache
|
||||
type BunchRefCache struct {
|
||||
cache
|
||||
buffer map[int64][]int64
|
||||
write chan map[int64][]int64
|
||||
buffer IdRefBunches
|
||||
write chan IdRefBunches
|
||||
add chan idRef
|
||||
mu sync.Mutex
|
||||
waitAdd *sync.WaitGroup
|
||||
waitWrite *sync.WaitGroup
|
||||
}
|
||||
|
||||
type CoordsRefIndex struct {
|
||||
RefIndex
|
||||
}
|
||||
type WaysRefIndex struct {
|
||||
RefIndex
|
||||
}
|
||||
|
||||
type idRef struct {
|
||||
id int64
|
||||
ref int64
|
||||
}
|
||||
|
||||
const cacheSize = 64 * 1024
|
||||
|
||||
var refCaches chan map[int64][]int64
|
||||
|
||||
func init() {
|
||||
refCaches = make(chan map[int64][]int64, 1)
|
||||
}
|
||||
|
||||
func NewRefIndex(path string, opts *cacheOptions) (*RefIndex, error) {
|
||||
index := RefIndex{}
|
||||
func newRefIndex(path string, opts *cacheOptions) (*BunchRefCache, error) {
|
||||
index := BunchRefCache{}
|
||||
index.options = opts
|
||||
err := index.open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
index.write = make(chan map[int64][]int64, 2)
|
||||
index.buffer = make(map[int64][]int64, cacheSize)
|
||||
index.write = make(chan IdRefBunches, 2)
|
||||
index.buffer = make(IdRefBunches, bufferSize)
|
||||
index.add = make(chan idRef, 1024)
|
||||
|
||||
index.waitWrite = &sync.WaitGroup{}
|
||||
|
@ -136,58 +185,29 @@ func NewRefIndex(path string, opts *cacheOptions) (*RefIndex, error) {
|
|||
return &index, nil
|
||||
}
|
||||
|
||||
func NewCoordsRefIndex(dir string) (*CoordsRefIndex, error) {
|
||||
cache, err := NewRefIndex(dir, &globalCacheOptions.CoordsIndex)
|
||||
type CoordsRefIndex struct {
|
||||
BunchRefCache
|
||||
}
|
||||
type WaysRefIndex struct {
|
||||
BunchRefCache
|
||||
}
|
||||
|
||||
func newCoordsRefIndex(dir string) (*CoordsRefIndex, error) {
|
||||
cache, err := newRefIndex(dir, &globalCacheOptions.CoordsIndex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &CoordsRefIndex{*cache}, nil
|
||||
}
|
||||
|
||||
func NewWaysRefIndex(dir string) (*WaysRefIndex, error) {
|
||||
cache, err := NewRefIndex(dir, &globalCacheOptions.WaysIndex)
|
||||
func newWaysRefIndex(dir string) (*WaysRefIndex, error) {
|
||||
cache, err := newRefIndex(dir, &globalCacheOptions.WaysIndex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &WaysRefIndex{*cache}, nil
|
||||
}
|
||||
|
||||
func (index *RefIndex) writer() {
|
||||
for buffer := range index.write {
|
||||
if err := index.writeRefs(buffer); err != nil {
|
||||
log.Println("error while writing ref index", err)
|
||||
}
|
||||
}
|
||||
index.waitWrite.Done()
|
||||
}
|
||||
|
||||
func (index *RefIndex) Close() {
|
||||
close(index.add)
|
||||
index.waitAdd.Wait()
|
||||
close(index.write)
|
||||
index.waitWrite.Wait()
|
||||
index.cache.Close()
|
||||
}
|
||||
|
||||
func (index *RefIndex) dispatch() {
|
||||
for idRef := range index.add {
|
||||
index.addToCache(idRef.id, idRef.ref)
|
||||
if len(index.buffer) >= cacheSize {
|
||||
index.write <- index.buffer
|
||||
select {
|
||||
case index.buffer = <-refCaches:
|
||||
default:
|
||||
index.buffer = make(map[int64][]int64, cacheSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(index.buffer) > 0 {
|
||||
index.write <- index.buffer
|
||||
index.buffer = nil
|
||||
}
|
||||
index.waitAdd.Done()
|
||||
}
|
||||
|
||||
func (index *CoordsRefIndex) AddFromWay(way *element.Way) {
|
||||
for _, node := range way.Nodes {
|
||||
index.add <- idRef{node.Id, way.Id}
|
||||
|
@ -202,41 +222,78 @@ func (index *WaysRefIndex) AddFromMembers(relId int64, members []element.Member)
|
|||
}
|
||||
}
|
||||
|
||||
func (index *RefIndex) addToCache(id, ref int64) {
|
||||
refs, ok := index.buffer[id]
|
||||
if !ok {
|
||||
refs = make([]int64, 0, 1)
|
||||
func (index *BunchRefCache) writer() {
|
||||
for buffer := range index.write {
|
||||
if err := index.writeRefs(buffer); err != nil {
|
||||
log.Println("error while writing ref index", err)
|
||||
}
|
||||
}
|
||||
refs = insertRefs(refs, ref)
|
||||
|
||||
index.buffer[id] = refs
|
||||
index.waitWrite.Done()
|
||||
}
|
||||
|
||||
type writeRefItem struct {
|
||||
key []byte
|
||||
data []byte
|
||||
}
|
||||
type loadRefItem struct {
|
||||
id int64
|
||||
refs []int64
|
||||
func (index *BunchRefCache) Close() {
|
||||
close(index.add)
|
||||
index.waitAdd.Wait()
|
||||
close(index.write)
|
||||
index.waitWrite.Wait()
|
||||
index.cache.Close()
|
||||
}
|
||||
|
||||
func (index *RefIndex) writeRefs(idRefs map[int64][]int64) error {
|
||||
func (index *BunchRefCache) dispatch() {
|
||||
for idRef := range index.add {
|
||||
index.buffer.add(index.getBunchId(idRef.id), idRef.id, idRef.ref)
|
||||
if len(index.buffer) >= bufferSize {
|
||||
index.write <- index.buffer
|
||||
select {
|
||||
case index.buffer = <-IdRefBunchesPool:
|
||||
default:
|
||||
index.buffer = make(IdRefBunches, bufferSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(index.buffer) > 0 {
|
||||
index.write <- index.buffer
|
||||
index.buffer = nil
|
||||
}
|
||||
index.waitAdd.Done()
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) AddFromWay(way *element.Way) {
|
||||
for _, node := range way.Nodes {
|
||||
index.add <- idRef{node.Id, way.Id}
|
||||
}
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) getBunchId(id int64) int64 {
|
||||
return id / 64
|
||||
}
|
||||
|
||||
type loadBunchItem struct {
|
||||
bunchId int64
|
||||
bunch IdRefBunch
|
||||
}
|
||||
|
||||
type writeBunchItem struct {
|
||||
bunchIdBuf []byte
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) writeRefs(idRefs IdRefBunches) error {
|
||||
batch := levigo.NewWriteBatch()
|
||||
defer batch.Close()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
putc := make(chan writeRefItem)
|
||||
loadc := make(chan loadRefItem)
|
||||
putc := make(chan writeBunchItem)
|
||||
loadc := make(chan loadBunchItem)
|
||||
|
||||
for i := 0; i < runtime.NumCPU(); i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for item := range loadc {
|
||||
keyBuf := idToKeyBuf(item.id)
|
||||
putc <- writeRefItem{
|
||||
keyBuf := idToKeyBuf(item.bunchId)
|
||||
putc <- writeBunchItem{
|
||||
keyBuf,
|
||||
index.loadAppendMarshal(keyBuf, item.refs),
|
||||
index.loadMergeMarshal(keyBuf, item.bunch.idRefs),
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
|
@ -244,8 +301,8 @@ func (index *RefIndex) writeRefs(idRefs map[int64][]int64) error {
|
|||
}
|
||||
|
||||
go func() {
|
||||
for id, refs := range idRefs {
|
||||
loadc <- loadRefItem{id, refs}
|
||||
for bunchId, bunch := range idRefs {
|
||||
loadc <- loadBunchItem{bunchId, bunch}
|
||||
}
|
||||
close(loadc)
|
||||
wg.Wait()
|
||||
|
@ -253,7 +310,7 @@ func (index *RefIndex) writeRefs(idRefs map[int64][]int64) error {
|
|||
}()
|
||||
|
||||
for item := range putc {
|
||||
batch.Put(item.key, item.data)
|
||||
batch.Put(item.bunchIdBuf, item.data)
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
@ -261,101 +318,161 @@ func (index *RefIndex) writeRefs(idRefs map[int64][]int64) error {
|
|||
delete(idRefs, k)
|
||||
}
|
||||
select {
|
||||
case refCaches <- idRefs:
|
||||
case IdRefBunchesPool <- idRefs:
|
||||
}
|
||||
}()
|
||||
return index.db.Write(index.wo, batch)
|
||||
|
||||
}
|
||||
func (index *RefIndex) loadAppendMarshal(keyBuf []byte, newRefs []int64) []byte {
|
||||
|
||||
func mergeBunch(bunch, newBunch []IdRef) []IdRef {
|
||||
lastIdx := 0
|
||||
|
||||
NextIdRef:
|
||||
// for each new IdRef...
|
||||
for _, newIdRefs := range newBunch {
|
||||
// search place in bunch
|
||||
for i := lastIdx; i < len(bunch); i++ {
|
||||
if bunch[i].id == newIdRefs.id {
|
||||
// id already present, add refs
|
||||
for _, r := range newIdRefs.refs {
|
||||
bunch[i].refs = insertRefs(bunch[i].refs, r)
|
||||
}
|
||||
lastIdx = i
|
||||
break NextIdRef
|
||||
}
|
||||
if bunch[i].id > newIdRefs.id {
|
||||
// insert before
|
||||
bunch = append(bunch, IdRef{})
|
||||
copy(bunch[i+1:], bunch[i:])
|
||||
bunch[i] = newIdRefs
|
||||
lastIdx = i
|
||||
break NextIdRef
|
||||
}
|
||||
}
|
||||
// insert at the end
|
||||
bunch = append(bunch, newIdRefs)
|
||||
lastIdx = len(bunch) - 1
|
||||
}
|
||||
return bunch
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) loadMergeMarshal(keyBuf []byte, newBunch []IdRef) []byte {
|
||||
data, err := index.db.Get(index.ro, keyBuf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var refs []int64
|
||||
var bunch []IdRef
|
||||
|
||||
if data != nil {
|
||||
refs = UnmarshalRefs(data)
|
||||
bunch = UnmarshalBunch(data)
|
||||
}
|
||||
|
||||
if refs == nil {
|
||||
refs = newRefs
|
||||
if bunch == nil {
|
||||
bunch = newBunch
|
||||
} else {
|
||||
refs = append(refs, newRefs...)
|
||||
sort.Sort(byInt64(refs))
|
||||
bunch = mergeBunch(bunch, newBunch)
|
||||
}
|
||||
|
||||
data = MarshalRefs(refs)
|
||||
data = MarshalBunch(bunch)
|
||||
return data
|
||||
}
|
||||
|
||||
func insertRefs(refs []int64, ref int64) []int64 {
|
||||
i := sort.Search(len(refs), func(i int) bool {
|
||||
return refs[i] >= ref
|
||||
})
|
||||
if i < len(refs) && refs[i] >= ref {
|
||||
refs = append(refs, 0)
|
||||
copy(refs[i+1:], refs[i:])
|
||||
refs[i] = ref
|
||||
} else {
|
||||
refs = append(refs, ref)
|
||||
}
|
||||
return refs
|
||||
}
|
||||
func (index *BunchRefCache) Get(id int64) []int64 {
|
||||
keyBuf := idToKeyBuf(index.getBunchId(id))
|
||||
|
||||
func (index *RefIndex) Get(id int64) []int64 {
|
||||
keyBuf := idToKeyBuf(id)
|
||||
data, err := index.db.Get(index.ro, keyBuf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var refs []int64
|
||||
|
||||
if data != nil {
|
||||
refs = UnmarshalRefs(data)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
for _, idRef := range UnmarshalBunch(data) {
|
||||
if idRef.id == id {
|
||||
return idRef.refs
|
||||
}
|
||||
}
|
||||
}
|
||||
return refs
|
||||
return nil
|
||||
}
|
||||
|
||||
func UnmarshalRefs(buf []byte) []int64 {
|
||||
refs := make([]int64, 0, 8)
|
||||
|
||||
r := bytes.NewBuffer(buf)
|
||||
|
||||
lastRef := int64(0)
|
||||
for {
|
||||
ref, err := binary.ReadVarint(r)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
log.Println("error while unmarshaling refs:", err)
|
||||
break
|
||||
}
|
||||
ref = lastRef + ref
|
||||
refs = append(refs, ref)
|
||||
lastRef = ref
|
||||
}
|
||||
|
||||
return refs
|
||||
}
|
||||
|
||||
func MarshalRefs(refs []int64) []byte {
|
||||
buf := make([]byte, len(refs)*4+binary.MaxVarintLen64)
|
||||
func MarshalBunch(idRefs []IdRef) []byte {
|
||||
buf := make([]byte, len(idRefs)*(4+1+6)+binary.MaxVarintLen64)
|
||||
|
||||
lastRef := int64(0)
|
||||
lastId := int64(0)
|
||||
nextPos := 0
|
||||
for _, ref := range refs {
|
||||
|
||||
nextPos += binary.PutUvarint(buf[nextPos:], uint64(len(idRefs)))
|
||||
|
||||
for _, idRef := range idRefs {
|
||||
if len(buf)-nextPos < binary.MaxVarintLen64 {
|
||||
tmp := make([]byte, len(buf)*2)
|
||||
copy(tmp, buf)
|
||||
buf = tmp
|
||||
}
|
||||
nextPos += binary.PutVarint(buf[nextPos:], ref-lastRef)
|
||||
lastRef = ref
|
||||
nextPos += binary.PutVarint(buf[nextPos:], idRef.id-lastId)
|
||||
lastId = idRef.id
|
||||
}
|
||||
for _, idRef := range idRefs {
|
||||
if len(buf)-nextPos < binary.MaxVarintLen64 {
|
||||
tmp := make([]byte, len(buf)*2)
|
||||
copy(tmp, buf)
|
||||
buf = tmp
|
||||
}
|
||||
nextPos += binary.PutUvarint(buf[nextPos:], uint64(len(idRef.refs)))
|
||||
}
|
||||
for _, idRef := range idRefs {
|
||||
for _, ref := range idRef.refs {
|
||||
if len(buf)-nextPos < binary.MaxVarintLen64 {
|
||||
tmp := make([]byte, len(buf)*2)
|
||||
copy(tmp, buf)
|
||||
buf = tmp
|
||||
}
|
||||
nextPos += binary.PutVarint(buf[nextPos:], ref-lastRef)
|
||||
lastRef = ref
|
||||
}
|
||||
}
|
||||
return buf[:nextPos]
|
||||
}
|
||||
|
||||
func UnmarshalBunch(buf []byte) []IdRef {
|
||||
|
||||
r := bytes.NewBuffer(buf)
|
||||
n, err := binary.ReadUvarint(r)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
idRefs := make([]IdRef, n)
|
||||
|
||||
last := int64(0)
|
||||
for i := 0; uint64(i) < n; i++ {
|
||||
idRefs[i].id, err = binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
idRefs[i].id += last
|
||||
last = idRefs[i].id
|
||||
}
|
||||
var numRefs uint64
|
||||
for i := 0; uint64(i) < n; i++ {
|
||||
numRefs, err = binary.ReadUvarint(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
idRefs[i].refs = make([]int64, numRefs)
|
||||
}
|
||||
last = 0
|
||||
for idIdx := 0; uint64(idIdx) < n; idIdx++ {
|
||||
for refIdx := 0; refIdx < len(idRefs[idIdx].refs); refIdx++ {
|
||||
idRefs[idIdx].refs[refIdx], err = binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
idRefs[idIdx].refs[refIdx] += last
|
||||
last = idRefs[idIdx].refs[refIdx]
|
||||
}
|
||||
}
|
||||
return idRefs
|
||||
}
|
||||
|
|
|
@ -46,41 +46,42 @@ func TestInsertRefs(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func TestMarshalRefs(t *testing.T) {
|
||||
refs := []int64{1890166659, -1890166659, 0, 1890166, 1890167, 1890167, 1890165}
|
||||
buf := MarshalRefs(refs)
|
||||
// TODO write test for (Un)MarshalBunch
|
||||
// func TestMarshalRefs(t *testing.T) {
|
||||
// refs := []int64{1890166659, -1890166659, 0, 1890166, 1890167, 1890167, 1890165}
|
||||
// buf := MarshalRefs(refs)
|
||||
|
||||
t.Log(len(refs), len(buf))
|
||||
result := UnmarshalRefs(buf)
|
||||
// t.Log(len(refs), len(buf))
|
||||
// result := UnmarshalRefs(buf)
|
||||
|
||||
if len(result) != len(refs) {
|
||||
t.Fatal(result)
|
||||
}
|
||||
for i, ref := range refs {
|
||||
if result[i] != ref {
|
||||
t.Fatal(result)
|
||||
}
|
||||
}
|
||||
// if len(result) != len(refs) {
|
||||
// t.Fatal(result)
|
||||
// }
|
||||
// for i, ref := range refs {
|
||||
// if result[i] != ref {
|
||||
// t.Fatal(result)
|
||||
// }
|
||||
// }
|
||||
|
||||
}
|
||||
// }
|
||||
|
||||
func TestWriteDiff(t *testing.T) {
|
||||
cache_dir, _ := ioutil.TempDir("", "goposm_test")
|
||||
defer os.RemoveAll(cache_dir)
|
||||
|
||||
cache, err := NewRefIndex(cache_dir, &globalCacheOptions.CoordsIndex)
|
||||
cache, err := newRefIndex(cache_dir, &globalCacheOptions.CoordsIndex)
|
||||
if err != nil {
|
||||
t.Fatal()
|
||||
}
|
||||
|
||||
for w := 0; w < 5; w++ {
|
||||
for n := 0; n < 200; n++ {
|
||||
cache.addToCache(int64(n), int64(w))
|
||||
cache.buffer.add(cache.getBunchId(int64(n)), int64(n), int64(w))
|
||||
}
|
||||
}
|
||||
cache.Close()
|
||||
|
||||
cache, err = NewRefIndex(cache_dir, &globalCacheOptions.CoordsIndex)
|
||||
cache, err = newRefIndex(cache_dir, &globalCacheOptions.CoordsIndex)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -100,7 +101,7 @@ func BenchmarkWriteDiff(b *testing.B) {
|
|||
cache_dir, _ := ioutil.TempDir("", "goposm_test")
|
||||
defer os.RemoveAll(cache_dir)
|
||||
|
||||
cache, err := NewRefIndex(cache_dir, &globalCacheOptions.CoordsIndex)
|
||||
cache, err := newRefIndex(cache_dir, &globalCacheOptions.CoordsIndex)
|
||||
if err != nil {
|
||||
b.Fatal()
|
||||
}
|
||||
|
@ -110,7 +111,7 @@ func BenchmarkWriteDiff(b *testing.B) {
|
|||
for i := 0; i < b.N; i++ {
|
||||
for w := 0; w < 5; w++ {
|
||||
for n := 0; n < 200; n++ {
|
||||
cache.addToCache(int64(n), int64(w))
|
||||
cache.buffer.add(1, int64(n), int64(w))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,347 +0,0 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"github.com/jmhodges/levigo"
|
||||
"goposm/element"
|
||||
"log"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type IdRef struct {
|
||||
id int64
|
||||
refs []int64
|
||||
}
|
||||
|
||||
// IdRefBunch stores multiple IdRefs
|
||||
type IdRefBunch struct {
|
||||
id int64 // the bunch id
|
||||
idRefs []IdRef
|
||||
}
|
||||
|
||||
// IdRefBunches can hold multiple IdRefBunch
|
||||
type IdRefBunches map[int64]IdRefBunch
|
||||
|
||||
func (bunches *IdRefBunches) add(bunchId, id, ref int64) {
|
||||
bunch, ok := (*bunches)[bunchId]
|
||||
if !ok {
|
||||
bunch = IdRefBunch{id: bunchId}
|
||||
}
|
||||
var idRef *IdRef
|
||||
|
||||
i := sort.Search(len(bunch.idRefs), func(i int) bool {
|
||||
return bunch.idRefs[i].id >= id
|
||||
})
|
||||
if i < len(bunch.idRefs) && bunch.idRefs[i].id >= id {
|
||||
if bunch.idRefs[i].id == id {
|
||||
idRef = &bunch.idRefs[i]
|
||||
} else {
|
||||
bunch.idRefs = append(bunch.idRefs, IdRef{})
|
||||
copy(bunch.idRefs[i+1:], bunch.idRefs[i:])
|
||||
bunch.idRefs[i] = IdRef{id: id}
|
||||
idRef = &bunch.idRefs[i]
|
||||
}
|
||||
} else {
|
||||
bunch.idRefs = append(bunch.idRefs, IdRef{id: id})
|
||||
idRef = &bunch.idRefs[len(bunch.idRefs)-1]
|
||||
}
|
||||
|
||||
idRef.refs = insertRefs(idRef.refs, ref)
|
||||
(*bunches)[bunchId] = bunch
|
||||
}
|
||||
|
||||
var IdRefBunchesPool chan IdRefBunches
|
||||
|
||||
func init() {
|
||||
IdRefBunchesPool = make(chan IdRefBunches, 1)
|
||||
}
|
||||
|
||||
// BunchRefCache
|
||||
type BunchRefCache struct {
|
||||
cache
|
||||
buffer IdRefBunches
|
||||
write chan IdRefBunches
|
||||
add chan idRef
|
||||
mu sync.Mutex
|
||||
waitAdd *sync.WaitGroup
|
||||
waitWrite *sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewBunchRefCache(path string, opts *cacheOptions) (*BunchRefCache, error) {
|
||||
index := BunchRefCache{}
|
||||
index.options = opts
|
||||
err := index.open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
index.write = make(chan IdRefBunches, 2)
|
||||
index.buffer = make(IdRefBunches, cacheSize)
|
||||
index.add = make(chan idRef, 1024)
|
||||
|
||||
index.waitWrite = &sync.WaitGroup{}
|
||||
index.waitAdd = &sync.WaitGroup{}
|
||||
index.waitWrite.Add(1)
|
||||
index.waitAdd.Add(1)
|
||||
|
||||
go index.writer()
|
||||
go index.dispatch()
|
||||
return &index, nil
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) writer() {
|
||||
for buffer := range index.write {
|
||||
if err := index.writeRefs(buffer); err != nil {
|
||||
log.Println("error while writing ref index", err)
|
||||
}
|
||||
}
|
||||
index.waitWrite.Done()
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) Close() {
|
||||
close(index.add)
|
||||
index.waitAdd.Wait()
|
||||
close(index.write)
|
||||
index.waitWrite.Wait()
|
||||
index.cache.Close()
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) dispatch() {
|
||||
for idRef := range index.add {
|
||||
index.buffer.add(index.getBunchId(idRef.id), idRef.id, idRef.ref)
|
||||
if len(index.buffer) >= cacheSize {
|
||||
index.write <- index.buffer
|
||||
select {
|
||||
case index.buffer = <-IdRefBunchesPool:
|
||||
default:
|
||||
index.buffer = make(IdRefBunches, cacheSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(index.buffer) > 0 {
|
||||
index.write <- index.buffer
|
||||
index.buffer = nil
|
||||
}
|
||||
index.waitAdd.Done()
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) AddFromWay(way *element.Way) {
|
||||
for _, node := range way.Nodes {
|
||||
index.add <- idRef{node.Id, way.Id}
|
||||
}
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) getBunchId(id int64) int64 {
|
||||
return id / 64
|
||||
}
|
||||
|
||||
type loadBunchItem struct {
|
||||
bunchId int64
|
||||
bunch IdRefBunch
|
||||
}
|
||||
|
||||
type writeBunchItem struct {
|
||||
bunchIdBuf []byte
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) writeRefs(idRefs IdRefBunches) error {
|
||||
batch := levigo.NewWriteBatch()
|
||||
defer batch.Close()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
putc := make(chan writeBunchItem)
|
||||
loadc := make(chan loadBunchItem)
|
||||
|
||||
for i := 0; i < runtime.NumCPU(); i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for item := range loadc {
|
||||
keyBuf := idToKeyBuf(item.bunchId)
|
||||
putc <- writeBunchItem{
|
||||
keyBuf,
|
||||
index.loadMergeMarshal(keyBuf, item.bunch.idRefs),
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
for bunchId, bunch := range idRefs {
|
||||
loadc <- loadBunchItem{bunchId, bunch}
|
||||
}
|
||||
close(loadc)
|
||||
wg.Wait()
|
||||
close(putc)
|
||||
}()
|
||||
|
||||
for item := range putc {
|
||||
batch.Put(item.bunchIdBuf, item.data)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for k, _ := range idRefs {
|
||||
delete(idRefs, k)
|
||||
}
|
||||
select {
|
||||
case IdRefBunchesPool <- idRefs:
|
||||
}
|
||||
}()
|
||||
return index.db.Write(index.wo, batch)
|
||||
}
|
||||
|
||||
func mergeBunch(bunch, newBunch []IdRef) []IdRef {
|
||||
lastIdx := 0
|
||||
|
||||
NextIdRef:
|
||||
// for each new IdRef...
|
||||
for _, newIdRefs := range newBunch {
|
||||
// search place in bunch
|
||||
for i := lastIdx; i < len(bunch); i++ {
|
||||
if bunch[i].id == newIdRefs.id {
|
||||
// id already present, add refs
|
||||
for _, r := range newIdRefs.refs {
|
||||
bunch[i].refs = insertRefs(bunch[i].refs, r)
|
||||
}
|
||||
lastIdx = i
|
||||
break NextIdRef
|
||||
}
|
||||
if bunch[i].id > newIdRefs.id {
|
||||
// insert before
|
||||
bunch = append(bunch, IdRef{})
|
||||
copy(bunch[i+1:], bunch[i:])
|
||||
bunch[i] = newIdRefs
|
||||
lastIdx = i
|
||||
break NextIdRef
|
||||
}
|
||||
}
|
||||
// insert at the end
|
||||
bunch = append(bunch, newIdRefs)
|
||||
lastIdx = len(bunch) - 1
|
||||
}
|
||||
return bunch
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) loadMergeMarshal(keyBuf []byte, newBunch []IdRef) []byte {
|
||||
data, err := index.db.Get(index.ro, keyBuf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var bunch []IdRef
|
||||
|
||||
if data != nil {
|
||||
bunch = UnmarshalBunch(data)
|
||||
}
|
||||
|
||||
if bunch == nil {
|
||||
bunch = newBunch
|
||||
} else {
|
||||
bunch = mergeBunch(bunch, newBunch)
|
||||
}
|
||||
|
||||
data = MarshalBunch(bunch)
|
||||
return data
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) Get(id int64) []int64 {
|
||||
keyBuf := idToKeyBuf(index.getBunchId(id))
|
||||
|
||||
data, err := index.db.Get(index.ro, keyBuf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if data != nil {
|
||||
for _, idRef := range UnmarshalBunch(data) {
|
||||
if idRef.id == id {
|
||||
return idRef.refs
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func MarshalBunch(idRefs []IdRef) []byte {
|
||||
buf := make([]byte, len(idRefs)*(4+1+6)+binary.MaxVarintLen64)
|
||||
|
||||
lastRef := int64(0)
|
||||
lastId := int64(0)
|
||||
nextPos := 0
|
||||
|
||||
nextPos += binary.PutUvarint(buf[nextPos:], uint64(len(idRefs)))
|
||||
|
||||
for _, idRef := range idRefs {
|
||||
if len(buf)-nextPos < binary.MaxVarintLen64 {
|
||||
tmp := make([]byte, len(buf)*2)
|
||||
copy(tmp, buf)
|
||||
buf = tmp
|
||||
}
|
||||
nextPos += binary.PutVarint(buf[nextPos:], idRef.id-lastId)
|
||||
lastId = idRef.id
|
||||
}
|
||||
for _, idRef := range idRefs {
|
||||
if len(buf)-nextPos < binary.MaxVarintLen64 {
|
||||
tmp := make([]byte, len(buf)*2)
|
||||
copy(tmp, buf)
|
||||
buf = tmp
|
||||
}
|
||||
nextPos += binary.PutUvarint(buf[nextPos:], uint64(len(idRef.refs)))
|
||||
}
|
||||
for _, idRef := range idRefs {
|
||||
for _, ref := range idRef.refs {
|
||||
if len(buf)-nextPos < binary.MaxVarintLen64 {
|
||||
tmp := make([]byte, len(buf)*2)
|
||||
copy(tmp, buf)
|
||||
buf = tmp
|
||||
}
|
||||
nextPos += binary.PutVarint(buf[nextPos:], ref-lastRef)
|
||||
lastRef = ref
|
||||
}
|
||||
}
|
||||
return buf[:nextPos]
|
||||
}
|
||||
|
||||
func UnmarshalBunch(buf []byte) []IdRef {
|
||||
|
||||
r := bytes.NewBuffer(buf)
|
||||
n, err := binary.ReadUvarint(r)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
idRefs := make([]IdRef, n)
|
||||
|
||||
last := int64(0)
|
||||
for i := 0; uint64(i) < n; i++ {
|
||||
idRefs[i].id, err = binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
idRefs[i].id += last
|
||||
last = idRefs[i].id
|
||||
}
|
||||
var numRefs uint64
|
||||
for i := 0; uint64(i) < n; i++ {
|
||||
numRefs, err = binary.ReadUvarint(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
idRefs[i].refs = make([]int64, numRefs)
|
||||
}
|
||||
last = 0
|
||||
for idIdx := 0; uint64(idIdx) < n; idIdx++ {
|
||||
for refIdx := 0; refIdx < len(idRefs[idIdx].refs); refIdx++ {
|
||||
idRefs[idIdx].refs[refIdx], err = binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
idRefs[idIdx].refs[refIdx] += last
|
||||
last = idRefs[idIdx].refs[refIdx]
|
||||
}
|
||||
}
|
||||
return idRefs
|
||||
}
|
Loading…
Reference in New Issue