refactored cache API
parent
2ff97757d8
commit
6c5b7e0201
|
@ -84,11 +84,6 @@ func (c *DiffCache) Remove() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type idRef struct {
|
||||
id int64
|
||||
ref int64
|
||||
}
|
||||
|
||||
const bufferSize = 64 * 1024
|
||||
|
||||
func insertRefs(refs []int64, ref int64) []int64 {
|
||||
|
@ -107,74 +102,79 @@ func insertRefs(refs []int64, ref int64) []int64 {
|
|||
return refs
|
||||
}
|
||||
|
||||
type IdRef struct {
|
||||
type idRef struct {
|
||||
id int64
|
||||
ref int64 // for single id/ref
|
||||
}
|
||||
|
||||
type idRefs struct {
|
||||
id int64
|
||||
refs []int64
|
||||
}
|
||||
|
||||
// IdRefBunch stores multiple IdRefs
|
||||
type IdRefBunch struct {
|
||||
// idRefBunch stores multiple IdRefs
|
||||
type idRefBunch struct {
|
||||
id int64 // the bunch id
|
||||
idRefs []IdRef
|
||||
idRefs []idRefs
|
||||
}
|
||||
|
||||
// IdRefBunches can hold multiple IdRefBunch
|
||||
type IdRefBunches map[int64]IdRefBunch
|
||||
// idRefBunches can hold multiple idRefBunch
|
||||
type idRefBunches map[int64]idRefBunch
|
||||
|
||||
func (bunches *IdRefBunches) add(bunchId, id, ref int64) {
|
||||
func (bunches *idRefBunches) add(bunchId, id, ref int64) {
|
||||
bunch, ok := (*bunches)[bunchId]
|
||||
if !ok {
|
||||
bunch = IdRefBunch{id: bunchId}
|
||||
bunch = idRefBunch{id: bunchId}
|
||||
}
|
||||
var idRef *IdRef
|
||||
var targetIdRefs *idRefs
|
||||
|
||||
i := sort.Search(len(bunch.idRefs), func(i int) bool {
|
||||
return bunch.idRefs[i].id >= id
|
||||
})
|
||||
if i < len(bunch.idRefs) && bunch.idRefs[i].id >= id {
|
||||
if bunch.idRefs[i].id == id {
|
||||
idRef = &bunch.idRefs[i]
|
||||
targetIdRefs = &bunch.idRefs[i]
|
||||
} else {
|
||||
bunch.idRefs = append(bunch.idRefs, IdRef{})
|
||||
bunch.idRefs = append(bunch.idRefs, idRefs{})
|
||||
copy(bunch.idRefs[i+1:], bunch.idRefs[i:])
|
||||
bunch.idRefs[i] = IdRef{id: id}
|
||||
idRef = &bunch.idRefs[i]
|
||||
bunch.idRefs[i] = idRefs{id: id}
|
||||
targetIdRefs = &bunch.idRefs[i]
|
||||
}
|
||||
} else {
|
||||
bunch.idRefs = append(bunch.idRefs, IdRef{id: id})
|
||||
idRef = &bunch.idRefs[len(bunch.idRefs)-1]
|
||||
bunch.idRefs = append(bunch.idRefs, idRefs{id: id})
|
||||
targetIdRefs = &bunch.idRefs[len(bunch.idRefs)-1]
|
||||
}
|
||||
|
||||
idRef.refs = insertRefs(idRef.refs, ref)
|
||||
targetIdRefs.refs = insertRefs(targetIdRefs.refs, ref)
|
||||
(*bunches)[bunchId] = bunch
|
||||
}
|
||||
|
||||
var IdRefBunchesPool chan IdRefBunches
|
||||
var idRefBunchesPool chan idRefBunches
|
||||
|
||||
func init() {
|
||||
IdRefBunchesPool = make(chan IdRefBunches, 1)
|
||||
idRefBunchesPool = make(chan idRefBunches, 1)
|
||||
}
|
||||
|
||||
// BunchRefCache
|
||||
type BunchRefCache struct {
|
||||
// bunchRefCache
|
||||
type bunchRefCache struct {
|
||||
cache
|
||||
buffer IdRefBunches
|
||||
write chan IdRefBunches
|
||||
buffer idRefBunches
|
||||
write chan idRefBunches
|
||||
add chan idRef
|
||||
mu sync.Mutex
|
||||
waitAdd *sync.WaitGroup
|
||||
waitWrite *sync.WaitGroup
|
||||
}
|
||||
|
||||
func newRefIndex(path string, opts *cacheOptions) (*BunchRefCache, error) {
|
||||
index := BunchRefCache{}
|
||||
func newRefIndex(path string, opts *cacheOptions) (*bunchRefCache, error) {
|
||||
index := bunchRefCache{}
|
||||
index.options = opts
|
||||
err := index.open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
index.write = make(chan IdRefBunches, 2)
|
||||
index.buffer = make(IdRefBunches, bufferSize)
|
||||
index.write = make(chan idRefBunches, 2)
|
||||
index.buffer = make(idRefBunches, bufferSize)
|
||||
index.add = make(chan idRef, 1024)
|
||||
|
||||
index.waitWrite = &sync.WaitGroup{}
|
||||
|
@ -188,10 +188,10 @@ func newRefIndex(path string, opts *cacheOptions) (*BunchRefCache, error) {
|
|||
}
|
||||
|
||||
type CoordsRefIndex struct {
|
||||
BunchRefCache
|
||||
bunchRefCache
|
||||
}
|
||||
type WaysRefIndex struct {
|
||||
BunchRefCache
|
||||
bunchRefCache
|
||||
}
|
||||
|
||||
func newCoordsRefIndex(dir string) (*CoordsRefIndex, error) {
|
||||
|
@ -212,19 +212,19 @@ func newWaysRefIndex(dir string) (*WaysRefIndex, error) {
|
|||
|
||||
func (index *CoordsRefIndex) AddFromWay(way *element.Way) {
|
||||
for _, node := range way.Nodes {
|
||||
index.add <- idRef{node.Id, way.Id}
|
||||
index.add <- idRef{id: node.Id, ref: way.Id}
|
||||
}
|
||||
}
|
||||
|
||||
func (index *WaysRefIndex) AddFromMembers(relId int64, members []element.Member) {
|
||||
for _, member := range members {
|
||||
if member.Type == element.WAY {
|
||||
index.add <- idRef{member.Id, relId}
|
||||
index.add <- idRef{id: member.Id, ref: relId}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) writer() {
|
||||
func (index *bunchRefCache) writer() {
|
||||
for buffer := range index.write {
|
||||
if err := index.writeRefs(buffer); err != nil {
|
||||
log.Println("error while writing ref index", err)
|
||||
|
@ -233,7 +233,7 @@ func (index *BunchRefCache) writer() {
|
|||
index.waitWrite.Done()
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) Close() {
|
||||
func (index *bunchRefCache) Close() {
|
||||
close(index.add)
|
||||
index.waitAdd.Wait()
|
||||
close(index.write)
|
||||
|
@ -241,15 +241,15 @@ func (index *BunchRefCache) Close() {
|
|||
index.cache.Close()
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) dispatch() {
|
||||
func (index *bunchRefCache) dispatch() {
|
||||
for idRef := range index.add {
|
||||
index.buffer.add(index.getBunchId(idRef.id), idRef.id, idRef.ref)
|
||||
if len(index.buffer) >= bufferSize {
|
||||
index.write <- index.buffer
|
||||
select {
|
||||
case index.buffer = <-IdRefBunchesPool:
|
||||
case index.buffer = <-idRefBunchesPool:
|
||||
default:
|
||||
index.buffer = make(IdRefBunches, bufferSize)
|
||||
index.buffer = make(idRefBunches, bufferSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -260,19 +260,19 @@ func (index *BunchRefCache) dispatch() {
|
|||
index.waitAdd.Done()
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) AddFromWay(way *element.Way) {
|
||||
func (index *bunchRefCache) AddFromWay(way *element.Way) {
|
||||
for _, node := range way.Nodes {
|
||||
index.add <- idRef{node.Id, way.Id}
|
||||
index.add <- idRef{id: node.Id, ref: way.Id}
|
||||
}
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) getBunchId(id int64) int64 {
|
||||
func (index *bunchRefCache) getBunchId(id int64) int64 {
|
||||
return id / 64
|
||||
}
|
||||
|
||||
type loadBunchItem struct {
|
||||
bunchId int64
|
||||
bunch IdRefBunch
|
||||
bunch idRefBunch
|
||||
}
|
||||
|
||||
type writeBunchItem struct {
|
||||
|
@ -280,7 +280,7 @@ type writeBunchItem struct {
|
|||
data []byte
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) writeRefs(idRefs IdRefBunches) error {
|
||||
func (index *bunchRefCache) writeRefs(idRefs idRefBunches) error {
|
||||
batch := levigo.NewWriteBatch()
|
||||
defer batch.Close()
|
||||
|
||||
|
@ -320,17 +320,17 @@ func (index *BunchRefCache) writeRefs(idRefs IdRefBunches) error {
|
|||
delete(idRefs, k)
|
||||
}
|
||||
select {
|
||||
case IdRefBunchesPool <- idRefs:
|
||||
case idRefBunchesPool <- idRefs:
|
||||
}
|
||||
}()
|
||||
return index.db.Write(index.wo, batch)
|
||||
}
|
||||
|
||||
func mergeBunch(bunch, newBunch []IdRef) []IdRef {
|
||||
func mergeBunch(bunch, newBunch []idRefs) []idRefs {
|
||||
lastIdx := 0
|
||||
|
||||
NextIdRef:
|
||||
// for each new IdRef...
|
||||
// for each new idRef...
|
||||
for _, newIdRefs := range newBunch {
|
||||
// search place in bunch
|
||||
for i := lastIdx; i < len(bunch); i++ {
|
||||
|
@ -344,7 +344,7 @@ NextIdRef:
|
|||
}
|
||||
if bunch[i].id > newIdRefs.id {
|
||||
// insert before
|
||||
bunch = append(bunch, IdRef{})
|
||||
bunch = append(bunch, idRefs{})
|
||||
copy(bunch[i+1:], bunch[i:])
|
||||
bunch[i] = newIdRefs
|
||||
lastIdx = i
|
||||
|
@ -358,16 +358,16 @@ NextIdRef:
|
|||
return bunch
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) loadMergeMarshal(keyBuf []byte, newBunch []IdRef) []byte {
|
||||
func (index *bunchRefCache) loadMergeMarshal(keyBuf []byte, newBunch []idRefs) []byte {
|
||||
data, err := index.db.Get(index.ro, keyBuf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var bunch []IdRef
|
||||
var bunch []idRefs
|
||||
|
||||
if data != nil {
|
||||
bunch = UnmarshalBunch(data)
|
||||
bunch = unmarshalBunch(data)
|
||||
}
|
||||
|
||||
if bunch == nil {
|
||||
|
@ -376,11 +376,11 @@ func (index *BunchRefCache) loadMergeMarshal(keyBuf []byte, newBunch []IdRef) []
|
|||
bunch = mergeBunch(bunch, newBunch)
|
||||
}
|
||||
|
||||
data = MarshalBunch(bunch)
|
||||
data = marshalBunch(bunch)
|
||||
return data
|
||||
}
|
||||
|
||||
func (index *BunchRefCache) Get(id int64) []int64 {
|
||||
func (index *bunchRefCache) Get(id int64) []int64 {
|
||||
keyBuf := idToKeyBuf(index.getBunchId(id))
|
||||
|
||||
data, err := index.db.Get(index.ro, keyBuf)
|
||||
|
@ -389,7 +389,7 @@ func (index *BunchRefCache) Get(id int64) []int64 {
|
|||
}
|
||||
|
||||
if data != nil {
|
||||
for _, idRef := range UnmarshalBunch(data) {
|
||||
for _, idRef := range unmarshalBunch(data) {
|
||||
if idRef.id == id {
|
||||
return idRef.refs
|
||||
}
|
||||
|
@ -398,7 +398,7 @@ func (index *BunchRefCache) Get(id int64) []int64 {
|
|||
return nil
|
||||
}
|
||||
|
||||
func MarshalBunch(idRefs []IdRef) []byte {
|
||||
func marshalBunch(idRefs []idRefs) []byte {
|
||||
buf := make([]byte, len(idRefs)*(4+1+6)+binary.MaxVarintLen64)
|
||||
|
||||
lastRef := int64(0)
|
||||
|
@ -438,7 +438,7 @@ func MarshalBunch(idRefs []IdRef) []byte {
|
|||
return buf[:nextPos]
|
||||
}
|
||||
|
||||
func UnmarshalBunch(buf []byte) []IdRef {
|
||||
func unmarshalBunch(buf []byte) []idRefs {
|
||||
|
||||
r := bytes.NewBuffer(buf)
|
||||
n, err := binary.ReadUvarint(r)
|
||||
|
@ -446,7 +446,7 @@ func UnmarshalBunch(buf []byte) []IdRef {
|
|||
return nil
|
||||
}
|
||||
|
||||
idRefs := make([]IdRef, n)
|
||||
idRefs := make([]idRefs, n)
|
||||
|
||||
last := int64(0)
|
||||
for i := 0; uint64(i) < n; i++ {
|
||||
|
|
|
@ -76,8 +76,8 @@ func TestWriteDiff(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMarshalBunch(t *testing.T) {
|
||||
bunch := []IdRef{
|
||||
func TestmarshalBunch(t *testing.T) {
|
||||
bunch := []idRefs{
|
||||
{123923123, []int64{1213123}},
|
||||
{123923133, []int64{1231237}},
|
||||
{123924123, []int64{912412210, 912412213}},
|
||||
|
@ -86,8 +86,8 @@ func TestMarshalBunch(t *testing.T) {
|
|||
{123924132, []int64{912412210, 9124213, 212412210}},
|
||||
}
|
||||
|
||||
buf := MarshalBunch(bunch)
|
||||
newBunch := UnmarshalBunch(buf)
|
||||
buf := marshalBunch(bunch)
|
||||
newBunch := unmarshalBunch(buf)
|
||||
|
||||
t.Log(len(buf), float64(len(buf))/6.0)
|
||||
|
||||
|
@ -109,7 +109,7 @@ func TestMarshalBunch(t *testing.T) {
|
|||
}
|
||||
|
||||
func BenchmarkMarshalBunch(b *testing.B) {
|
||||
bunch := []IdRef{
|
||||
bunch := []idRefs{
|
||||
{123923123, []int64{1213123}},
|
||||
{123923133, []int64{1231237}},
|
||||
{123924123, []int64{912412210, 912412213}},
|
||||
|
@ -119,8 +119,8 @@ func BenchmarkMarshalBunch(b *testing.B) {
|
|||
}
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
buf := MarshalBunch(bunch)
|
||||
UnmarshalBunch(buf)
|
||||
buf := marshalBunch(bunch)
|
||||
unmarshalBunch(buf)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,33 +147,33 @@ func BenchmarkWriteDiff(b *testing.B) {
|
|||
}
|
||||
|
||||
func TestMergeIdRefs(t *testing.T) {
|
||||
bunch := []IdRef{}
|
||||
bunch := []idRefs{}
|
||||
|
||||
bunch = mergeBunch(bunch, []IdRef{IdRef{50, []int64{1}}})
|
||||
bunch = mergeBunch(bunch, []idRefs{idRefs{50, []int64{1}}})
|
||||
if b := bunch[0]; b.id != 50 || b.refs[0] != 1 {
|
||||
t.Fatal(bunch)
|
||||
}
|
||||
|
||||
// before
|
||||
bunch = mergeBunch(bunch, []IdRef{IdRef{40, []int64{3}}})
|
||||
bunch = mergeBunch(bunch, []idRefs{idRefs{40, []int64{3}}})
|
||||
if b := bunch[0]; b.id != 40 || b.refs[0] != 3 {
|
||||
t.Fatal(bunch)
|
||||
}
|
||||
|
||||
// after
|
||||
bunch = mergeBunch(bunch, []IdRef{IdRef{70, []int64{4}}})
|
||||
bunch = mergeBunch(bunch, []idRefs{idRefs{70, []int64{4}}})
|
||||
if b := bunch[2]; b.id != 70 || b.refs[0] != 4 {
|
||||
t.Fatal(bunch)
|
||||
}
|
||||
|
||||
// in between
|
||||
bunch = mergeBunch(bunch, []IdRef{IdRef{60, []int64{5}}})
|
||||
bunch = mergeBunch(bunch, []idRefs{idRefs{60, []int64{5}}})
|
||||
if b := bunch[2]; b.id != 60 || b.refs[0] != 5 {
|
||||
t.Fatal(bunch)
|
||||
}
|
||||
|
||||
// same (50:1 already inserted)
|
||||
bunch = mergeBunch(bunch, []IdRef{IdRef{50, []int64{0, 5}}})
|
||||
bunch = mergeBunch(bunch, []idRefs{idRefs{50, []int64{0, 5}}})
|
||||
if b := bunch[1]; b.id != 50 || len(b.refs) != 3 ||
|
||||
b.refs[0] != 0 || b.refs[1] != 1 || b.refs[2] != 5 {
|
||||
t.Fatal(bunch)
|
||||
|
@ -185,7 +185,7 @@ func TestMergeIdRefs(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIdRefBunches(t *testing.T) {
|
||||
bunches := make(IdRefBunches)
|
||||
bunches := make(idRefBunches)
|
||||
bunches.add(1, 100, 999)
|
||||
|
||||
if r := bunches[1].idRefs[0]; r.id != 100 || r.refs[0] != 999 {
|
||||
|
|
Loading…
Reference in New Issue