limit ways and relations as well

only cache ways if the first coord is cached, only cache relations
if the first member is cached.

we need to sync the read process so that we have all coords cached
before the first way gets checked for limitto.
master
Oliver Tonnhofer 2013-11-12 09:21:17 +01:00
parent 8959147ccf
commit 9d02c24ea0
8 changed files with 222 additions and 40 deletions

11
cache/delta.go vendored
View File

@ -368,3 +368,14 @@ func (self *DeltaCoordsCache) CheckCapacity() {
delete(self.table, bunchId)
}
}
func (self *DeltaCoordsCache) FirstRefIsCached(refs []int64) bool {
if len(refs) <= 0 {
return false
}
_, err := self.GetCoord(refs[0])
if err != nil {
return false
}
return true
}

6
cache/relations.go vendored
View File

@ -21,6 +21,9 @@ func newRelationsCache(path string) (*RelationsCache, error) {
}
func (p *RelationsCache) PutRelation(relation *element.Relation) error {
if relation.Id == SKIP {
return nil
}
keyBuf := idToKeyBuf(relation.Id)
data, err := binary.MarshalRelation(relation)
if err != nil {
@ -34,6 +37,9 @@ func (p *RelationsCache) PutRelations(rels []element.Relation) error {
defer batch.Close()
for _, rel := range rels {
if rel.Id == SKIP {
continue
}
if len(rel.Tags) == 0 {
continue
}

19
cache/ways.go vendored
View File

@ -21,6 +21,9 @@ func newWaysCache(path string) (*WaysCache, error) {
}
func (p *WaysCache) PutWay(way *element.Way) error {
if way.Id == SKIP {
return nil
}
keyBuf := idToKeyBuf(way.Id)
data, err := binary.MarshalWay(way)
if err != nil {
@ -34,6 +37,9 @@ func (p *WaysCache) PutWays(ways []element.Way) error {
defer batch.Close()
for _, way := range ways {
if way.Id == SKIP {
continue
}
keyBuf := idToKeyBuf(way.Id)
data, err := binary.MarshalWay(&way)
if err != nil {
@ -107,6 +113,19 @@ func (self *WaysCache) FillMembers(members []element.Member) error {
return nil
}
func (self *WaysCache) FirstMemberIsCached(members []element.Member) bool {
for _, m := range members {
if m.Type == element.WAY {
_, err := self.GetWay(m.Id)
if err != nil {
return false
}
return true
}
}
return false
}
type InsertedWaysCache struct {
cache
}

View File

@ -159,14 +159,14 @@ For:
if elem.Rel != nil {
// check if first member is cached to avoid caching
// unneeded relations (typical outside of our coverage)
if memberIsCached(elem.Rel.Members, osmCache.Ways) {
if osmCache.Ways.FirstMemberIsCached(elem.Rel.Members) {
osmCache.Relations.PutRelation(elem.Rel)
relIds[elem.Rel.Id] = true
}
} else if elem.Way != nil {
// check if first coord is cached to avoid caching
// unneeded ways (typical outside of our coverage)
if coordIsCached(elem.Way.Refs, osmCache.Coords) {
if osmCache.Coords.FirstRefIsCached(elem.Way.Refs) {
osmCache.Ways.PutWay(elem.Way)
wayIds[elem.Way.Id] = true
}
@ -291,27 +291,3 @@ For:
}
}
}
func memberIsCached(members []element.Member, wayCache *cache.WaysCache) bool {
for _, m := range members {
if m.Type == element.WAY {
_, err := wayCache.GetWay(m.Id)
if err != nil {
return false
}
return true
}
}
return false
}
func coordIsCached(refs []int64, coordCache *cache.DeltaCoordsCache) bool {
if len(refs) <= 0 {
return false
}
_, err := coordCache.GetCoord(refs[0])
if err != nil {
return false
}
return true
}

View File

@ -2,6 +2,7 @@ package pbf
import (
"imposm3/element"
"imposm3/util"
"runtime"
"sync"
)
@ -14,10 +15,20 @@ type parser struct {
relations chan []element.Relation
nParser int
wg sync.WaitGroup
waySync *util.SyncPoint
relSync *util.SyncPoint
}
func NewParser(pbf *Pbf, coords chan []element.Node, nodes chan []element.Node, ways chan []element.Way, relations chan []element.Relation) *parser {
return &parser{pbf, coords, nodes, ways, relations, runtime.NumCPU(), sync.WaitGroup{}}
return &parser{
pbf: pbf,
coords: coords,
nodes: nodes,
ways: ways,
relations: relations,
nParser: runtime.NumCPU(),
wg: sync.WaitGroup{},
}
}
func (p *parser) Start() {
@ -28,6 +39,12 @@ func (p *parser) Start() {
for block := range blocks {
p.parseBlock(block)
}
if p.waySync != nil {
p.waySync.Sync()
}
if p.relSync != nil {
p.relSync.Sync()
}
p.wg.Done()
}()
}
@ -37,6 +54,14 @@ func (p *parser) Close() {
p.wg.Wait()
}
func (p *parser) NotifyWays(cb func()) {
p.waySync = util.NewSyncPoint(p.nParser, cb)
}
func (p *parser) NotifyRelations(cb func()) {
p.relSync = util.NewSyncPoint(p.nParser, cb)
}
func (p *parser) parseBlock(pos Block) {
block := readPrimitiveBlock(pos)
stringtable := newStringTable(block.GetStringtable())
@ -61,10 +86,19 @@ func (p *parser) parseBlock(pos Block) {
}
parsedWays := readWays(group.Ways, block, stringtable)
if len(parsedWays) > 0 {
if p.waySync != nil {
p.waySync.Sync()
}
p.ways <- parsedWays
}
parsedRelations := readRelations(group.Relations, block, stringtable)
if len(parsedRelations) > 0 {
if p.waySync != nil {
p.waySync.Sync()
}
if p.relSync != nil {
p.relSync.Sync()
}
p.relations <- parsedRelations
}
}

View File

@ -1,6 +1,12 @@
package reader
import (
"os"
"runtime"
"strconv"
"strings"
"sync"
osmcache "imposm3/cache"
"imposm3/element"
"imposm3/geom/geos"
@ -10,11 +16,7 @@ import (
"imposm3/parser/pbf"
"imposm3/proj"
"imposm3/stats"
"os"
"runtime"
"strconv"
"strings"
"sync"
"imposm3/util"
)
var log = logging.NewLogger("reader")
@ -67,6 +69,32 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
}
parser := pbf.NewParser(pbfFile, coords, nodes, ways, relations)
coordsSynced := make(chan bool)
coordsSync := util.NewSyncPoint(int(nCoords+nNodes), func() {
coordsSynced <- true
})
parser.NotifyWays(func() {
for i := 0; int64(i) < nCoords; i++ {
coords <- nil
}
for i := 0; int64(i) < nNodes; i++ {
nodes <- nil
}
<-coordsSynced
})
waysSynced := make(chan bool)
waysSync := util.NewSyncPoint(int(nWays), func() {
waysSynced <- true
})
parser.NotifyRelations(func() {
for i := 0; int64(i) < nWays; i++ {
ways <- nil
}
<-waysSynced
})
parser.Start()
waitWriter := sync.WaitGroup{}
@ -74,18 +102,33 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
for i := 0; int64(i) < nWays; i++ {
waitWriter.Add(1)
go func() {
var skip, hit int
m := tagmapping.WayTagFilter()
for ws := range ways {
if ws == nil {
waysSync.Sync()
continue
}
if skipWays {
continue
}
for i, _ := range ws {
m.Filter(&ws[i].Tags)
if withLimiter {
if !cache.Coords.FirstRefIsCached(ws[i].Refs) {
ws[i].Id = osmcache.SKIP
skip += 1
} else {
hit += 1
}
}
}
// TODO check withLimiter
cache.Ways.PutWays(ws)
progress.AddWays(len(ws))
}
waitWriter.Done()
}()
}
@ -93,6 +136,8 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
for i := 0; int64(i) < nRels; i++ {
waitWriter.Add(1)
go func() {
var skip, hit int
m := tagmapping.RelationTagFilter()
for rels := range relations {
numWithTags := 0
@ -101,11 +146,21 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
if len(rels[i].Tags) > 0 {
numWithTags += 1
}
if withLimiter {
if !cache.Ways.FirstMemberIsCached(rels[i].Members) {
skip += 1
rels[i].Id = osmcache.SKIP
} else {
hit += 1
}
}
}
// TODO check withLimiter
cache.Relations.PutRelations(rels)
progress.AddRelations(numWithTags)
}
waitWriter.Done()
}()
}
@ -113,10 +168,12 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
for i := 0; int64(i) < nCoords; i++ {
waitWriter.Add(1)
go func() {
var skip, hit int
g := geos.NewGeos()
defer g.Finish()
for nds := range coords {
if skipCoords {
if nds == nil {
coordsSync.Sync()
continue
}
if withLimiter {
@ -124,7 +181,10 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
nd := element.Node{Long: nds[i].Long, Lat: nds[i].Lat}
proj.NodeToMerc(&nd)
if !limiter.IntersectsBuffer(g, nd.Long, nd.Lat) {
skip += 1
nds[i].Id = osmcache.SKIP
} else {
hit += 1
}
}
}
@ -142,14 +202,22 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
defer g.Finish()
m := tagmapping.NodeTagFilter()
for nds := range nodes {
if nds == nil {
coordsSync.Sync()
continue
}
numWithTags := 0
for i, _ := range nds {
m.Filter(&nds[i].Tags)
if len(nds[i].Tags) > 0 {
numWithTags += 1
}
if withLimiter && !limiter.IntersectsBuffer(g, nds[i].Long, nds[i].Lat) {
nds[i].Id = osmcache.SKIP
if withLimiter {
nd := element.Node{Long: nds[i].Long, Lat: nds[i].Lat}
proj.NodeToMerc(&nd)
if !limiter.IntersectsBuffer(g, nd.Long, nd.Lat) {
nds[i].Id = osmcache.SKIP
}
}
}
cache.Nodes.PutNodes(nds)
@ -160,9 +228,9 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
}
parser.Close()
close(coords)
close(nodes)
close(ways)
close(relations)
close(ways)
close(nodes)
close(coords)
waitWriter.Wait()
}

36
util/sync.go Normal file
View File

@ -0,0 +1,36 @@
package util
import (
"sync"
)
type SyncPoint struct {
synced bool
wg sync.WaitGroup
once sync.Once
callbackWg sync.WaitGroup
callback func()
}
func NewSyncPoint(n int, callback func()) *SyncPoint {
s := &SyncPoint{callback: callback}
s.wg.Add(n)
s.callbackWg.Add(1)
return s
}
func (s *SyncPoint) Sync() {
if s.synced {
return
}
s.wg.Done()
s.wg.Wait()
s.once.Do(s.Call)
s.callbackWg.Wait()
}
func (s *SyncPoint) Call() {
s.callback()
s.synced = true
s.callbackWg.Done()
}

32
util/sync_test.go Normal file
View File

@ -0,0 +1,32 @@
package util
import (
"testing"
)
func TestSyncPoint(t *testing.T) {
done := make(chan bool)
check := int32(0)
sp := NewSyncPoint(2, func() {
done <- true
check = 1
})
wait := func() {
if check != 0 {
panic("check set")
}
sp.Sync()
if check != 1 {
panic("check not set")
}
}
go wait()
go wait()
<-done
// does not wait/block
sp.Sync()
}