From d8cfe75b717b969031b9a6a16bd93008b6fe1c1e Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Sun, 14 Apr 2013 14:50:38 +0200 Subject: [PATCH] store coords in bunches --- cache/db.go | 22 ++++- cache/delta.go | 239 +++++++++++++++++++++++++++++++++++++++++++++++++ parser.go | 27 +++++- 3 files changed, 284 insertions(+), 4 deletions(-) diff --git a/cache/db.go b/cache/db.go index 5ec41b9..e92372f 100644 --- a/cache/db.go +++ b/cache/db.go @@ -55,12 +55,12 @@ func (p *Cache) PutCoords(nodes []element.Node) { p.db.Write(p.wo, batch) } -func (p *Cache) PutCoordsPacked(nodes []element.Node) { +func (p *Cache) PutCoordsPacked(bunchId int64, nodes []element.Node) { if len(nodes) == 0 { return } keyBuf := make([]byte, 8) - bin.PutVarint(keyBuf, int64(nodes[0].Id)) + bin.PutVarint(keyBuf, bunchId) deltaCoords := packNodes(nodes) data, err := proto.Marshal(deltaCoords) @@ -70,6 +70,24 @@ func (p *Cache) PutCoordsPacked(nodes []element.Node) { p.db.Put(p.wo, keyBuf, data) } +func (p *Cache) GetCoordsPacked(bunchId int64) []element.Node { + keyBuf := make([]byte, 8) + bin.PutVarint(keyBuf, bunchId) + + data, err := p.db.Get(p.ro, keyBuf) + if err != nil { + panic(err) + } + deltaCoords := &DeltaCoords{} + err = proto.Unmarshal(data, deltaCoords) + if err != nil { + panic(err) + } + + nodes := unpackNodes(deltaCoords) + return nodes +} + func (p *Cache) GetCoord(id int64) *element.Node { keyBuf := make([]byte, 8) bin.PutVarint(keyBuf, int64(id)) diff --git a/cache/delta.go b/cache/delta.go index bf07bdc..edd887d 100644 --- a/cache/delta.go +++ b/cache/delta.go @@ -1,8 +1,10 @@ package cache import ( + "container/list" "goposm/binary" "goposm/element" + "sync" ) func packNodes(nodes []element.Node) *DeltaCoords { @@ -27,3 +29,240 @@ func packNodes(nodes []element.Node) *DeltaCoords { } return &DeltaCoords{Ids: ids, Lats: lats, Lons: lons} } + +func unpackNodes(deltaCoords *DeltaCoords) []element.Node { + nodes := make([]element.Node, len(deltaCoords.Ids)) + + var lastLon, lastLat int64 + var lon, lat int64 + var lastId, id int64 + + for i := range nodes { + id = lastId + deltaCoords.Ids[i] + lon = lastLon + deltaCoords.Lats[i] + lat = lastLat + deltaCoords.Lons[i] + nodes[i].Id = id + nodes[i].Long = binary.IntToCoord(uint32(lon)) + nodes[i].Lat = binary.IntToCoord(uint32(lat)) + + lastId = id + lastLon = lon + lastLat = lat + } + return nodes +} + +/* +type DeltaCoordsCache struct { + list *list.List + table map[int64]*list.Item + capacity uint64 +} + +func (d *DeltaCoordsCache) Get(coordId int64) { + coords, ok := d.table[coordId] + if !ok { + return nil + } + +} + +func (d *DeltaCoordsCache) Add(coords *DeltaCoords) { + id := coords[0].Id + id = id / 8196 +} + +type LRUCache struct { + mu sync.Mutex + + // list & table of *entry objects + list *list.List + table map[int64]*list.Element + + // Our current size, in bytes. Obviously a gross simplification and low-grade + // approximation. + size uint64 + + // How many bytes we are limiting the cache to. + capacity uint64 +} + +type Item struct { + Key int64 + Value *list.Element +} + +func NewLRUCache(capacity uint64) *LRUCache { + return &LRUCache{ + list: list.New(), + table: make(map[int64]*list.Element), + capacity: capacity, + } +} + +func (self *LRUCache) Get(key int64) (v interface{}, ok bool) { + self.mu.Lock() + defer self.mu.Unlock() + + element := self.table[key] + if element == nil { + return nil, false + } + self.moveToFront(element) + return element.Value, true +} + +func (self *LRUCache) Set(id int64, value interface{}) { + self.mu.Lock() + defer self.mu.Unlock() + + if element := self.table[id]; element != nil { + self.list.MoveToFront(element) + } else { + self.addNew(id, value) + } +} + +func (self *LRUCache) SetIfAbsent(key string, value Value) { + self.mu.Lock() + defer self.mu.Unlock() + + if element := self.table[key]; element != nil { + self.moveToFront(element) + } else { + self.addNew(key, value) + } +} + +func (self *LRUCache) Delete(key int64) bool { + self.mu.Lock() + defer self.mu.Unlock() + + element := self.table[key] + if element == nil { + return false + } + + self.list.Remove(element) + delete(self.table, key) + return true +} + +func (self *LRUCache) Clear() { + self.mu.Lock() + defer self.mu.Unlock() + + self.list.Init() + self.table = make(map[string]*list.Element) + self.size = 0 +} + +func (self *LRUCache) SetCapacity(capacity uint64) { + self.mu.Lock() + defer self.mu.Unlock() + + self.capacity = capacity + self.checkCapacity() +} + +func (self *LRUCache) addNew(key string, value Value) { + newEntry := &entry{key, value, value.Size(), time.Now()} + element := self.list.PushFront(newEntry) + self.table[key] = element + self.size += uint64(newEntry.size) + self.checkCapacity() +} + +func (self *LRUCache) checkCapacity() { + // Partially duplicated from Delete + for len(self.table) > self.capacity { + delElem := self.list.Back() + delValue := delElem.Value.(*entry) + self.list.Remove(delElem) + delete(self.table, delValue.key) + self.size -= uint64(delValue.size) + } +} +*/ + +type CoordsBunchLRU struct { + list *list.List + table map[int64]*CoordsBunch + capacity int64 + mu sync.Mutex +} + +func NewCoordsBunchLRU(capacity int64) *CoordsBunchLRU { + lru := CoordsBunchLRU{} + lru.list = list.New() + lru.table = make(map[int64]*CoordsBunch) + lru.capacity = capacity + return &lru +} + +type CoordsBunch struct { + id int64 + coords []element.Node + mu sync.RWMutex +} + +type CoordsCache struct { + cache *Cache + lru *CoordsBunchLRU +} + +func NewCoordsCache(cache *Cache) *CoordsCache { + coordsCache := CoordsCache{cache: cache} + coordsCache.lru = NewCoordsBunchLRU(100) + return &coordsCache +} + +func (self *CoordsCache) Close() { + for bunchId, bunch := range self.lru.table { + self.cache.PutCoordsPacked(bunchId, bunch.coords) + } + self.cache.Close() +} +func (self *CoordsCache) PutCoord(node element.Node) { + bunch := self.GetBunch(node.Id) + bunch.mu.Lock() + defer bunch.mu.Unlock() + bunch.coords = append(bunch.coords, node) +} + +func (self *CoordsCache) PutCoords(nodes []element.Node) { + var start, currentBunchId int64 + currentBunchId = BunchId(nodes[0].Id) + start = 0 + for i, node := range nodes { + bunchId := BunchId(node.Id) + if bunchId != currentBunchId { + bunch := self.GetBunch(currentBunchId) + bunch.coords = append(bunch.coords, nodes[start:i-1]...) + currentBunchId = bunchId + start = int64(i) + } + } + bunch := self.GetBunch(currentBunchId) + bunch.coords = append(bunch.coords, nodes[start:]...) +} + +func BunchId(nodeId int64) int64 { + return nodeId / (1024 * 128) +} + +func (self *CoordsCache) GetBunch(bunchId int64) *CoordsBunch { + self.lru.mu.Lock() + defer self.lru.mu.Unlock() + bunch, ok := self.lru.table[bunchId] + if !ok { + nodes := self.cache.GetCoordsPacked(bunchId) + if nodes == nil { + bunch = &CoordsBunch{} + } else { + bunch = &CoordsBunch{id: bunchId, coords: nodes} + } + self.lru.table[bunchId] = bunch + } + return bunch +} diff --git a/parser.go b/parser.go index 1dec5de..d00b424 100644 --- a/parser.go +++ b/parser.go @@ -56,20 +56,43 @@ func parse(filename string) { waitCounter.Done() }() - nodeCache := cache.NewCache("/tmp/goposm/node.cache") + nodeCacheInternal := cache.NewCache("/tmp/goposm/node.cache") + nodeCache := cache.NewCoordsCache(nodeCacheInternal) defer nodeCache.Close() for i := 0; i < 2; i++ { waitCounter.Add(1) go func() { nodeCounter := 0 for nds := range nodes { - nodeCache.PutCoordsPacked(nds) + if len(nds) == 0 { + continue + } + nodeCache.PutCoords(nds) nodeCounter += 1 } fmt.Println("nodes", nodeCounter) waitCounter.Done() }() } + /* + nodeCache := cache.NewCache("/tmp/goposm/node.cache") + defer nodeCache.Close() + for i := 0; i < 2; i++ { + waitCounter.Add(1) + go func() { + nodeCounter := 0 + for nds := range nodes { + if len(nds) == 0 { + continue + } + nodeCache.PutCoordsPacked(nds[0].Id/8196, nds) + nodeCounter += 1 + } + fmt.Println("nodes", nodeCounter) + waitCounter.Done() + }() + } + */ waitParser.Wait() close(nodes) close(ways)