write everything to cache in batches

master
Oliver Tonnhofer 2013-04-20 16:50:23 +02:00
parent 8efa1fae9b
commit 7f28d079fb
3 changed files with 77 additions and 32 deletions

41
cache/db.go vendored
View File

@ -55,10 +55,13 @@ func NewCoordsCache(path string) (*CoordsCache, error) {
type WaysCache struct {
Cache
toWrite chan []element.Way
}
func NewWaysCache(path string) (*WaysCache, error) {
cache := WaysCache{}
cache.toWrite = make(chan []element.Way)
go cache.wayWriter()
err := cache.open(path)
if err != nil {
return nil, err
@ -176,6 +179,28 @@ func (p *WaysCache) PutWays(ways []element.Way) error {
return p.db.Write(p.wo, batch)
}
func (p *WaysCache) _PutWays(ways []element.Way) {
p.toWrite <- ways
}
func (p *WaysCache) wayWriter() {
for ways := range p.toWrite {
batch := levigo.NewWriteBatch()
defer batch.Close()
keyBuf := make([]byte, 8)
for _, way := range ways {
bin.PutVarint(keyBuf, int64(way.Id))
data, err := binary.MarshalWay(&way)
if err != nil {
panic(err)
}
batch.Put(keyBuf, data)
}
_ = p.db.Write(p.wo, batch)
}
}
func (p *WaysCache) GetWay(id int64) (*element.Way, error) {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(id))
@ -203,6 +228,22 @@ func (p *RelationsCache) PutRelation(relation *element.Relation) error {
return p.db.Put(p.wo, keyBuf, data)
}
func (p *RelationsCache) PutRelations(rels []element.Relation) error {
batch := levigo.NewWriteBatch()
defer batch.Close()
keyBuf := make([]byte, 8)
for _, rel := range rels {
bin.PutVarint(keyBuf, int64(rel.Id))
data, err := binary.MarshalRelation(&rel)
if err != nil {
panic(err)
}
batch.Put(keyBuf, data)
}
return p.db.Write(p.wo, batch)
}
func (p *RelationsCache) GetRelation(id int64) (*element.Relation, error) {
keyBuf := make([]byte, 8)
bin.PutVarint(keyBuf, int64(id))

View File

@ -14,12 +14,12 @@ import (
func parse(filename string) {
nodes := make(chan []element.Node)
ways := make(chan []element.Way)
relations := make(chan element.Relation)
relations := make(chan []element.Relation)
positions := parser.PBFBlockPositions(filename)
waitParser := sync.WaitGroup{}
for i := 0; i < 2; i++ {
for i := 0; i < runtime.NumCPU(); i++ {
waitParser.Add(1)
go func() {
for pos := range positions {
@ -35,13 +35,13 @@ func parse(filename string) {
log.Fatal(err)
}
defer wayCache.Close()
for i := 0; i < 2; i++ {
for i := 0; i < runtime.NumCPU(); i++ {
waitCounter.Add(1)
go func() {
wayCounter := 0
for ws := range ways {
wayCache.PutWays(ws)
wayCounter += 1
wayCounter += len(ws)
}
fmt.Println("ways", wayCounter)
waitCounter.Done()
@ -52,32 +52,30 @@ func parse(filename string) {
log.Fatal(err)
}
defer relCache.Close()
waitCounter.Add(1)
go func() {
relationCounter := 0
for rel := range relations {
relCache.PutRelation(&rel)
relationCounter += 1
}
fmt.Println("relations", relationCounter)
waitCounter.Done()
}()
for i := 0; i < runtime.NumCPU(); i++ {
waitCounter.Add(1)
go func() {
relationCounter := 0
for rels := range relations {
relCache.PutRelations(rels)
relationCounter += len(rels)
}
fmt.Println("relations", relationCounter)
waitCounter.Done()
}()
}
nodeCache, err := cache.NewDeltaCoordsCache("/tmp/goposm/node.cache")
if err != nil {
log.Fatal(err)
}
defer nodeCache.Close()
for i := 0; i < 2; i++ {
for i := 0; i < runtime.NumCPU(); i++ {
waitCounter.Add(1)
go func() {
nodeCounter := 0
for nds := range nodes {
if len(nds) == 0 {
continue
}
nodeCache.PutCoords(nds)
nodeCounter += 1
nodeCounter += len(nds)
}
fmt.Println("nodes", nodeCounter)
waitCounter.Done()
@ -110,6 +108,12 @@ func parse(filename string) {
}
func main() {
//f, err := os.Create("/tmp/goposm.pprof")
//if err != nil {
//log.Fatal(err)
//}
//pprof.StartCPUProfile(f)
//defer pprof.StopCPUProfile()
runtime.GOMAXPROCS(runtime.NumCPU())
parse(os.Args[1])
//parser.PBFStats(os.Args[1])

View File

@ -212,7 +212,7 @@ func PBFBlockPositions(filename string) chan BlockPosition {
return pbf.BlockPositions()
}
func ParseBlock(pos BlockPosition, nodes chan []element.Node, ways chan []element.Way, relations chan element.Relation) {
func ParseBlock(pos BlockPosition, nodes chan []element.Node, ways chan []element.Way, relations chan []element.Relation) {
block := ReadPrimitiveBlock(pos)
stringtable := NewStringTable(block.GetStringtable())
@ -220,21 +220,21 @@ func ParseBlock(pos BlockPosition, nodes chan []element.Node, ways chan []elemen
dense := group.GetDense()
if dense != nil {
parsedNodes := ReadDenseNodes(dense, block, stringtable)
nodes <- parsedNodes
if len(parsedNodes) > 0 {
nodes <- parsedNodes
}
}
parsedNodes := ReadNodes(group.Nodes, block, stringtable)
nodes <- parsedNodes
//for _, node := range parsedNodes {
//nodes <- node
//}
if len(parsedNodes) > 0 {
nodes <- parsedNodes
}
parsedWays := ReadWays(group.Ways, block, stringtable)
ways <- parsedWays
//for _, way := range parsedWays {
//ways <- way
//}
if len(parsedWays) > 0 {
ways <- parsedWays
}
parsedRelations := ReadRelations(group.Relations, block, stringtable)
for _, rel := range parsedRelations {
relations <- rel
if len(parsedRelations) > 0 {
relations <- parsedRelations
}
}