DeltaCoordsCache with LRU cache

master
Oliver Tonnhofer 2013-04-15 21:54:48 +02:00
parent d8cfe75b71
commit 064f7df428
3 changed files with 182 additions and 246 deletions

98
cache/db.go vendored
View File

@ -1,7 +1,6 @@
package cache
import (
"code.google.com/p/goprotobuf/proto"
bin "encoding/binary"
"github.com/jmhodges/levigo"
"goposm/binary"
@ -14,8 +13,8 @@ type Cache struct {
ro *levigo.ReadOptions
}
func NewCache(path string) *Cache {
result := &Cache{}
func NewCache(path string) Cache {
result := Cache{}
opts := levigo.NewOptions()
opts.SetCache(levigo.NewLRUCache(1024 * 1024 * 50))
opts.SetCreateIfMissing(true)
@ -29,7 +28,47 @@ func NewCache(path string) *Cache {
return result
}
func (p *Cache) PutCoord(node *element.Node) {
type NodesCache struct {
Cache
}
func NewNodesCache(path string) *NodesCache {
cache := NewCache(path)
nodesCache := NodesCache{cache}
return &nodesCache
}
type CoordsCache struct {
Cache
}
func NewCoordsCache(path string) *CoordsCache {
cache := NewCache(path)
coordsCache := CoordsCache{cache}
return &coordsCache
}
type WaysCache struct {
Cache
}
func NewWaysCache(path string) *WaysCache {
cache := NewCache(path)
waysCache := WaysCache{cache}
return &waysCache
}
type RelationsCache struct {
Cache
}
func NewRelationsCache(path string) *RelationsCache {
cache := NewCache(path)
relationsCache := RelationsCache{cache}
return &relationsCache
}
func (p *CoordsCache) PutCoord(node *element.Node) {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(node.Id))
data, err := binary.MarshalCoord(node)
@ -39,7 +78,7 @@ func (p *Cache) PutCoord(node *element.Node) {
p.db.Put(p.wo, keyBuf, data)
}
func (p *Cache) PutCoords(nodes []element.Node) {
func (p *CoordsCache) PutCoords(nodes []element.Node) {
batch := levigo.NewWriteBatch()
defer batch.Close()
@ -55,40 +94,7 @@ func (p *Cache) PutCoords(nodes []element.Node) {
p.db.Write(p.wo, batch)
}
func (p *Cache) PutCoordsPacked(bunchId int64, nodes []element.Node) {
if len(nodes) == 0 {
return
}
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, bunchId)
deltaCoords := packNodes(nodes)
data, err := proto.Marshal(deltaCoords)
if err != nil {
panic(err)
}
p.db.Put(p.wo, keyBuf, data)
}
func (p *Cache) GetCoordsPacked(bunchId int64) []element.Node {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, bunchId)
data, err := p.db.Get(p.ro, keyBuf)
if err != nil {
panic(err)
}
deltaCoords := &DeltaCoords{}
err = proto.Unmarshal(data, deltaCoords)
if err != nil {
panic(err)
}
nodes := unpackNodes(deltaCoords)
return nodes
}
func (p *Cache) GetCoord(id int64) *element.Node {
func (p *CoordsCache) GetCoord(id int64) *element.Node {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(id))
data, err := p.db.Get(p.ro, keyBuf)
@ -106,7 +112,7 @@ func (p *Cache) GetCoord(id int64) *element.Node {
return node
}
func (p *Cache) PutNode(node *element.Node) {
func (p *NodesCache) PutNode(node *element.Node) {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(node.Id))
data, err := binary.MarshalNode(node)
@ -116,7 +122,7 @@ func (p *Cache) PutNode(node *element.Node) {
p.db.Put(p.wo, keyBuf, data)
}
func (p *Cache) GetNode(id int64) *element.Node {
func (p *NodesCache) GetNode(id int64) *element.Node {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(id))
data, err := p.db.Get(p.ro, keyBuf)
@ -133,7 +139,7 @@ func (p *Cache) GetNode(id int64) *element.Node {
return node
}
func (p *Cache) PutWay(way *element.Way) {
func (p *WaysCache) PutWay(way *element.Way) {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(way.Id))
data, err := binary.MarshalWay(way)
@ -143,7 +149,7 @@ func (p *Cache) PutWay(way *element.Way) {
p.db.Put(p.wo, keyBuf, data)
}
func (p *Cache) PutWays(ways []element.Way) {
func (p *WaysCache) PutWays(ways []element.Way) {
batch := levigo.NewWriteBatch()
defer batch.Close()
@ -159,7 +165,7 @@ func (p *Cache) PutWays(ways []element.Way) {
p.db.Write(p.wo, batch)
}
func (p *Cache) GetWay(id int64) *element.Way {
func (p *WaysCache) GetWay(id int64) *element.Way {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(id))
data, err := p.db.Get(p.ro, keyBuf)
@ -176,7 +182,7 @@ func (p *Cache) GetWay(id int64) *element.Way {
return way
}
func (p *Cache) PutRelation(relation *element.Relation) {
func (p *RelationsCache) PutRelation(relation *element.Relation) {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(relation.Id))
data, err := binary.MarshalRelation(relation)
@ -186,7 +192,7 @@ func (p *Cache) PutRelation(relation *element.Relation) {
p.db.Put(p.wo, keyBuf, data)
}
func (p *Cache) GetRelation(id int64) *element.Relation {
func (p *RelationsCache) GetRelation(id int64) *element.Relation {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(id))
data, err := p.db.Get(p.ro, keyBuf)

323
cache/delta.go vendored
View File

@ -1,22 +1,24 @@
package cache
import (
"code.google.com/p/goprotobuf/proto"
"container/list"
bin "encoding/binary"
"goposm/binary"
"goposm/element"
"sync"
)
func packNodes(nodes []element.Node) *DeltaCoords {
func packNodes(nodes map[int64]element.Node) *DeltaCoords {
var lastLon, lastLat int64
var lon, lat int64
var lastId, id int64
var lastId int64
ids := make([]int64, len(nodes))
lons := make([]int64, len(nodes))
lats := make([]int64, len(nodes))
for i, nd := range nodes {
id = nd.Id
i := 0
for id, nd := range nodes {
lon = int64(binary.CoordToInt(nd.Long))
lat = int64(binary.CoordToInt(nd.Lat))
ids[i] = id - lastId
@ -26,24 +28,27 @@ func packNodes(nodes []element.Node) *DeltaCoords {
lastId = id
lastLon = lon
lastLat = lat
i++
}
return &DeltaCoords{Ids: ids, Lats: lats, Lons: lons}
}
func unpackNodes(deltaCoords *DeltaCoords) []element.Node {
nodes := make([]element.Node, len(deltaCoords.Ids))
func unpackNodes(deltaCoords *DeltaCoords) map[int64]element.Node {
nodes := make(map[int64]element.Node, len(deltaCoords.Ids))
var lastLon, lastLat int64
var lon, lat int64
var lastId, id int64
for i := range nodes {
for i := 0; i < len(deltaCoords.Ids); i++ {
id = lastId + deltaCoords.Ids[i]
lon = lastLon + deltaCoords.Lats[i]
lat = lastLat + deltaCoords.Lons[i]
nodes[i].Id = id
nodes[i].Long = binary.IntToCoord(uint32(lon))
nodes[i].Lat = binary.IntToCoord(uint32(lat))
nodes[id] = element.Node{
OSMElem: element.OSMElem{Id: int64(id)},
Long: binary.IntToCoord(uint32(lon)),
Lat: binary.IntToCoord(uint32(lat)),
}
lastId = id
lastLon = lon
@ -52,217 +57,143 @@ func unpackNodes(deltaCoords *DeltaCoords) []element.Node {
return nodes
}
/*
type CoordsBunch struct {
sync.Mutex
id int64
coords map[int64]element.Node
elem *list.Element
needsWrite bool
}
type DeltaCoordsCache struct {
list *list.List
table map[int64]*list.Item
capacity uint64
}
func (d *DeltaCoordsCache) Get(coordId int64) {
coords, ok := d.table[coordId]
if !ok {
return nil
}
}
func (d *DeltaCoordsCache) Add(coords *DeltaCoords) {
id := coords[0].Id
id = id / 8196
}
type LRUCache struct {
mu sync.Mutex
// list & table of *entry objects
list *list.List
table map[int64]*list.Element
// Our current size, in bytes. Obviously a gross simplification and low-grade
// approximation.
size uint64
// How many bytes we are limiting the cache to.
capacity uint64
}
type Item struct {
Key int64
Value *list.Element
}
func NewLRUCache(capacity uint64) *LRUCache {
return &LRUCache{
list: list.New(),
table: make(map[int64]*list.Element),
capacity: capacity,
}
}
func (self *LRUCache) Get(key int64) (v interface{}, ok bool) {
self.mu.Lock()
defer self.mu.Unlock()
element := self.table[key]
if element == nil {
return nil, false
}
self.moveToFront(element)
return element.Value, true
}
func (self *LRUCache) Set(id int64, value interface{}) {
self.mu.Lock()
defer self.mu.Unlock()
if element := self.table[id]; element != nil {
self.list.MoveToFront(element)
} else {
self.addNew(id, value)
}
}
func (self *LRUCache) SetIfAbsent(key string, value Value) {
self.mu.Lock()
defer self.mu.Unlock()
if element := self.table[key]; element != nil {
self.moveToFront(element)
} else {
self.addNew(key, value)
}
}
func (self *LRUCache) Delete(key int64) bool {
self.mu.Lock()
defer self.mu.Unlock()
element := self.table[key]
if element == nil {
return false
}
self.list.Remove(element)
delete(self.table, key)
return true
}
func (self *LRUCache) Clear() {
self.mu.Lock()
defer self.mu.Unlock()
self.list.Init()
self.table = make(map[string]*list.Element)
self.size = 0
}
func (self *LRUCache) SetCapacity(capacity uint64) {
self.mu.Lock()
defer self.mu.Unlock()
self.capacity = capacity
self.checkCapacity()
}
func (self *LRUCache) addNew(key string, value Value) {
newEntry := &entry{key, value, value.Size(), time.Now()}
element := self.list.PushFront(newEntry)
self.table[key] = element
self.size += uint64(newEntry.size)
self.checkCapacity()
}
func (self *LRUCache) checkCapacity() {
// Partially duplicated from Delete
for len(self.table) > self.capacity {
delElem := self.list.Back()
delValue := delElem.Value.(*entry)
self.list.Remove(delElem)
delete(self.table, delValue.key)
self.size -= uint64(delValue.size)
}
}
*/
type CoordsBunchLRU struct {
list *list.List
Cache
lruList *list.List
table map[int64]*CoordsBunch
capacity int64
mu sync.Mutex
}
func NewCoordsBunchLRU(capacity int64) *CoordsBunchLRU {
lru := CoordsBunchLRU{}
lru.list = list.New()
lru.table = make(map[int64]*CoordsBunch)
lru.capacity = capacity
return &lru
}
type CoordsBunch struct {
id int64
coords []element.Node
mu sync.RWMutex
}
type CoordsCache struct {
cache *Cache
lru *CoordsBunchLRU
}
func NewCoordsCache(cache *Cache) *CoordsCache {
coordsCache := CoordsCache{cache: cache}
coordsCache.lru = NewCoordsBunchLRU(100)
func NewDeltaCoordsCache(path string) *DeltaCoordsCache {
cache := NewCache(path)
coordsCache := DeltaCoordsCache{}
coordsCache.Cache = cache
coordsCache.lruList = list.New()
coordsCache.table = make(map[int64]*CoordsBunch)
coordsCache.capacity = 100
return &coordsCache
}
func (self *CoordsCache) Close() {
for bunchId, bunch := range self.lru.table {
self.cache.PutCoordsPacked(bunchId, bunch.coords)
func (self *DeltaCoordsCache) Close() {
for getBunchId, bunch := range self.table {
if bunch.needsWrite {
self.putCoordsPacked(getBunchId, bunch.coords)
}
}
self.cache.Close()
}
func (self *CoordsCache) PutCoord(node element.Node) {
bunch := self.GetBunch(node.Id)
bunch.mu.Lock()
defer bunch.mu.Unlock()
bunch.coords = append(bunch.coords, node)
self.Cache.Close()
}
func (self *CoordsCache) PutCoords(nodes []element.Node) {
var start, currentBunchId int64
currentBunchId = BunchId(nodes[0].Id)
func (self *DeltaCoordsCache) GetCoord(id int64) (element.Node, bool) {
getBunchId := getBunchId(id)
bunch := self.getBunch(getBunchId)
defer bunch.Unlock()
node, ok := bunch.coords[id]
if !ok {
return element.Node{}, false
}
return node, true
}
func (self *DeltaCoordsCache) PutCoords(nodes []element.Node) {
var start, currentgetBunchId int64
currentgetBunchId = getBunchId(nodes[0].Id)
start = 0
for i, node := range nodes {
bunchId := BunchId(node.Id)
if bunchId != currentBunchId {
bunch := self.GetBunch(currentBunchId)
bunch.coords = append(bunch.coords, nodes[start:i-1]...)
currentBunchId = bunchId
getBunchId := getBunchId(node.Id)
if getBunchId != currentgetBunchId {
bunch := self.getBunch(currentgetBunchId)
for _, nd := range nodes[start : i-1] {
bunch.coords[nd.Id] = nd
}
currentgetBunchId = getBunchId
start = int64(i)
bunch.needsWrite = true
bunch.Unlock()
}
}
bunch := self.GetBunch(currentBunchId)
bunch.coords = append(bunch.coords, nodes[start:]...)
bunch := self.getBunch(currentgetBunchId)
for _, nd := range nodes[start:] {
bunch.coords[nd.Id] = nd
}
bunch.needsWrite = true
bunch.Unlock()
}
func BunchId(nodeId int64) int64 {
return nodeId / (1024 * 128)
func (p *DeltaCoordsCache) putCoordsPacked(getBunchId int64, nodes map[int64]element.Node) {
if len(nodes) == 0 {
return
}
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, getBunchId)
deltaCoords := packNodes(nodes)
data, err := proto.Marshal(deltaCoords)
if err != nil {
panic(err)
}
p.db.Put(p.wo, keyBuf, data)
}
func (self *CoordsCache) GetBunch(bunchId int64) *CoordsBunch {
self.lru.mu.Lock()
defer self.lru.mu.Unlock()
bunch, ok := self.lru.table[bunchId]
func (p *DeltaCoordsCache) getCoordsPacked(getBunchId int64) map[int64]element.Node {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, getBunchId)
data, err := p.db.Get(p.ro, keyBuf)
if err != nil {
panic(err)
}
deltaCoords := &DeltaCoords{}
err = proto.Unmarshal(data, deltaCoords)
if err != nil {
panic(err)
}
nodes := unpackNodes(deltaCoords)
return nodes
}
func getBunchId(nodeId int64) int64 {
return nodeId / (1024 * 32)
}
func (self *DeltaCoordsCache) getBunch(getBunchId int64) *CoordsBunch {
self.mu.Lock()
defer self.mu.Unlock()
bunch, ok := self.table[getBunchId]
if !ok {
nodes := self.cache.GetCoordsPacked(bunchId)
elem := self.lruList.PushFront(getBunchId)
nodes := self.getCoordsPacked(getBunchId)
if nodes == nil {
bunch = &CoordsBunch{}
bunch = &CoordsBunch{elem: elem}
} else {
bunch = &CoordsBunch{id: bunchId, coords: nodes}
bunch = &CoordsBunch{id: getBunchId, coords: nodes, elem: elem}
}
self.lru.table[bunchId] = bunch
self.table[getBunchId] = bunch
} else {
self.lruList.MoveToFront(bunch.elem)
}
bunch.Lock()
self.CheckCapacity()
return bunch
}
func (self *DeltaCoordsCache) CheckCapacity() {
for int64(len(self.table)) > self.capacity {
elem := self.lruList.Back()
getBunchId := self.lruList.Remove(elem).(int64)
bunch := self.table[getBunchId]
if bunch.needsWrite {
self.putCoordsPacked(getBunchId, bunch.coords)
}
delete(self.table, getBunchId)
}
}

View File

@ -29,7 +29,7 @@ func parse(filename string) {
}
waitCounter := sync.WaitGroup{}
wayCache := cache.NewCache("/tmp/goposm/way.cache")
wayCache := cache.NewWaysCache("/tmp/goposm/way.cache")
defer wayCache.Close()
for i := 0; i < 2; i++ {
waitCounter.Add(1)
@ -43,7 +43,7 @@ func parse(filename string) {
waitCounter.Done()
}()
}
relCache := cache.NewCache("/tmp/goposm/relation.cache")
relCache := cache.NewRelationsCache("/tmp/goposm/relation.cache")
defer relCache.Close()
waitCounter.Add(1)
go func() {
@ -56,8 +56,7 @@ func parse(filename string) {
waitCounter.Done()
}()
nodeCacheInternal := cache.NewCache("/tmp/goposm/node.cache")
nodeCache := cache.NewCoordsCache(nodeCacheInternal)
nodeCache := cache.NewDeltaCoordsCache("/tmp/goposm/node.cache")
defer nodeCache.Close()
for i := 0; i < 2; i++ {
waitCounter.Add(1)