imposm3/goposm.go

324 lines
6.5 KiB
Go
Raw Normal View History

2012-12-26 17:07:30 +04:00
package main
import (
"flag"
2013-04-08 23:45:13 +04:00
"goposm/cache"
2013-05-08 18:45:14 +04:00
"goposm/db"
2013-04-08 23:45:13 +04:00
"goposm/element"
2013-05-08 18:45:14 +04:00
"goposm/geom"
"goposm/geom/geos"
2013-05-07 12:13:09 +04:00
"goposm/mapping"
2013-02-12 22:45:49 +04:00
"goposm/parser"
2013-05-08 18:45:14 +04:00
"goposm/proj"
2013-05-06 13:03:52 +04:00
"goposm/stats"
2013-04-16 23:14:19 +04:00
"log"
"os"
2013-04-08 23:45:13 +04:00
"runtime"
"runtime/pprof"
"strconv"
2013-05-13 12:21:12 +04:00
"strings"
2013-04-08 23:45:13 +04:00
"sync"
2013-05-13 12:21:12 +04:00
"time"
2012-12-26 17:07:30 +04:00
)
2013-05-07 13:42:05 +04:00
var skipCoords, skipNodes, skipWays bool
var dbImportBatchSize int64
2013-05-07 13:42:05 +04:00
func init() {
if os.Getenv("GOPOSM_SKIP_COORDS") != "" {
skipCoords = true
}
if os.Getenv("GOPOSM_SKIP_NODES") != "" {
skipNodes = true
}
if os.Getenv("GOPOSM_SKIP_WAYS") != "" {
skipWays = true
}
dbImportBatchSize, _ = strconv.ParseInt(
os.Getenv("GOPOSM_DBIMPORT_BATCHSIZE"), 10, 32)
if dbImportBatchSize == 0 {
dbImportBatchSize = 4096
}
2013-05-07 13:42:05 +04:00
}
2013-05-10 12:57:06 +04:00
type ErrorLevel interface {
Level() int
}
2013-05-06 13:03:52 +04:00
func parse(cache *cache.OSMCache, progress *stats.Statistics, filename string) {
nodes := make(chan []element.Node)
coords := make(chan []element.Node)
ways := make(chan []element.Way)
2013-04-20 18:50:23 +04:00
relations := make(chan []element.Relation)
2013-04-08 23:45:13 +04:00
positions := parser.PBFBlockPositions(filename)
waitParser := sync.WaitGroup{}
2013-04-20 18:50:23 +04:00
for i := 0; i < runtime.NumCPU(); i++ {
2013-04-08 23:45:13 +04:00
waitParser.Add(1)
go func() {
for pos := range positions {
parser.ParseBlock(
pos,
coords,
nodes,
ways,
relations,
)
2013-04-08 23:45:13 +04:00
}
//runtime.GC()
2013-04-08 23:45:13 +04:00
waitParser.Done()
}()
}
waitCounter := sync.WaitGroup{}
2013-05-04 18:27:05 +04:00
2013-04-20 18:50:23 +04:00
for i := 0; i < runtime.NumCPU(); i++ {
waitCounter.Add(1)
go func() {
for ws := range ways {
2013-05-07 13:42:05 +04:00
if skipWays {
continue
}
2013-05-07 12:13:09 +04:00
for _, w := range ws {
mapping.WayTags.Filter(w.Tags)
}
2013-05-04 18:27:05 +04:00
cache.Ways.PutWays(ws)
2013-05-06 13:03:52 +04:00
progress.AddWays(len(ws))
}
waitCounter.Done()
}()
}
2013-04-20 18:50:23 +04:00
for i := 0; i < runtime.NumCPU(); i++ {
waitCounter.Add(1)
go func() {
for rels := range relations {
2013-05-07 12:13:09 +04:00
for _, r := range rels {
mapping.RelationTags.Filter(r.Tags)
}
2013-05-04 18:27:05 +04:00
cache.Relations.PutRelations(rels)
2013-05-06 13:03:52 +04:00
progress.AddRelations(len(rels))
2013-04-20 18:50:23 +04:00
}
waitCounter.Done()
}()
}
for i := 0; i < runtime.NumCPU(); i++ {
2013-04-08 23:45:13 +04:00
waitCounter.Add(1)
go func() {
for nds := range coords {
2013-05-07 13:42:05 +04:00
if skipCoords {
continue
}
2013-05-04 18:27:05 +04:00
cache.Coords.PutCoords(nds)
2013-05-06 13:03:52 +04:00
progress.AddCoords(len(nds))
2013-04-08 23:45:13 +04:00
}
waitCounter.Done()
}()
}
for i := 0; i < 2; i++ {
waitCounter.Add(1)
go func() {
for nds := range nodes {
2013-05-07 13:42:05 +04:00
if skipNodes {
continue
}
2013-05-07 12:13:09 +04:00
for _, nd := range nds {
ok := mapping.PointTags.Filter(nd.Tags)
if !ok {
nd.Tags = nil
}
}
2013-05-04 18:27:05 +04:00
n, _ := cache.Nodes.PutNodes(nds)
2013-05-06 13:03:52 +04:00
progress.AddNodes(n)
}
2013-04-08 23:45:13 +04:00
waitCounter.Done()
}()
}
2013-04-08 23:45:13 +04:00
waitParser.Wait()
close(coords)
2013-04-08 23:45:13 +04:00
close(nodes)
close(ways)
close(relations)
waitCounter.Wait()
}
2013-05-06 13:10:37 +04:00
var (
cpuprofile = flag.String("cpuprofile", "", "filename of cpu profile output")
2013-05-13 12:21:12 +04:00
memprofile = flag.String("memprofile", "", "dir name of mem profile output and interval (fname:interval)")
cachedir = flag.String("cachedir", "/tmp/goposm", "cache directory")
overwritecache = flag.Bool("overwritecache", false, "overwritecache")
appendcache = flag.Bool("appendcache", false, "append cache")
read = flag.String("read", "", "read")
write = flag.Bool("write", false, "write")
2013-05-10 12:09:52 +04:00
connection = flag.String("connection", "", "connection parameters")
diff = flag.Bool("diff", false, "enable diff support")
2013-05-06 13:10:37 +04:00
)
2012-12-26 17:07:30 +04:00
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
2013-05-06 13:10:37 +04:00
runtime.GOMAXPROCS(runtime.NumCPU())
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
2013-05-13 12:21:12 +04:00
if *memprofile != "" {
parts := strings.Split(*memprofile, string(os.PathListSeparator))
var interval time.Duration
if len(parts) < 2 {
interval, _ = time.ParseDuration("1m")
} else {
var err error
interval, err = time.ParseDuration(parts[1])
if err != nil {
log.Fatal(err)
}
}
go stats.MemProfiler(parts[0], interval)
}
osmCache := cache.NewOSMCache(*cachedir)
if *read != "" && osmCache.Exists() {
if *overwritecache {
log.Println("removing existing cache", *cachedir)
err := osmCache.Remove()
if err != nil {
log.Fatal("unable to remove cache:", err)
}
} else if !*appendcache {
log.Fatal("cache already exists use -appendcache or -overwritecache")
}
}
err := osmCache.Open()
if err != nil {
log.Fatal(err)
}
defer osmCache.Close()
2013-05-04 18:27:05 +04:00
2013-05-06 13:03:52 +04:00
progress := stats.StatsReporter()
if *read != "" {
osmCache.Coords.SetLinearImport(true)
parse(osmCache, progress, *read)
osmCache.Coords.SetLinearImport(false)
2013-05-10 15:55:14 +04:00
progress.Reset()
}
if *write {
progress.Reset()
rel := osmCache.Relations.Iter()
for _ = range rel {
progress.AddRelations(1)
// fmt.Println(r)
}
way := osmCache.Ways.Iter()
diffCache := cache.NewDiffCache(*cachedir)
if err = diffCache.Remove(); err != nil {
log.Fatal(err)
}
if err = diffCache.Open(); err != nil {
log.Fatal(err)
}
2013-05-06 12:21:03 +04:00
waitFill := sync.WaitGroup{}
2013-05-08 18:45:14 +04:00
wayChan := make(chan []element.Way)
waitDb := &sync.WaitGroup{}
2013-05-10 12:09:52 +04:00
config := db.Config{"postgres", *connection, 3857, "public"}
2013-05-08 18:45:14 +04:00
pg, err := db.Open(config)
if err != nil {
log.Fatal(err)
}
specs := []db.TableSpec{
{
"goposm_test",
config.Schema,
[]db.ColumnSpec{
{"name", "VARCHAR"},
{"highway", "VARCHAR"},
},
"LINESTRING",
config.Srid,
},
}
err = pg.Init(specs)
if err != nil {
log.Fatal(err)
}
for i := 0; i < runtime.NumCPU(); i++ {
2013-05-08 18:45:14 +04:00
waitDb.Add(1)
go func() {
for ways := range wayChan {
2013-05-13 11:53:38 +04:00
err := pg.InsertWays(ways, specs[0])
if err != nil {
log.Fatal(err)
}
2013-05-08 18:45:14 +04:00
}
waitDb.Done()
}()
}
2013-05-06 12:21:03 +04:00
2013-05-08 18:45:14 +04:00
for i := 0; i < runtime.NumCPU(); i++ {
waitFill.Add(1)
go func() {
2013-05-13 11:53:38 +04:00
var err error
2013-05-08 18:45:14 +04:00
geos := geos.NewGEOS()
defer geos.Finish()
batch := make([]element.Way, 0, dbImportBatchSize)
for w := range way {
progress.AddWays(1)
ok := osmCache.Coords.FillWay(w)
if !ok {
continue
}
2013-05-08 18:45:14 +04:00
proj.NodesToMerc(w.Nodes)
w.Wkb, err = geom.LineStringWKB(geos, w.Nodes)
if err != nil {
2013-05-10 12:57:06 +04:00
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
2013-05-08 18:45:14 +04:00
log.Println(err)
continue
}
batch = append(batch, *w)
if len(batch) >= int(dbImportBatchSize) {
2013-05-08 18:45:14 +04:00
wayChan <- batch
batch = make([]element.Way, 0, dbImportBatchSize)
2013-05-08 18:45:14 +04:00
}
if *diff {
2013-05-13 11:54:00 +04:00
diffCache.Coords.AddFromWay(w)
2013-05-06 12:21:03 +04:00
}
}
2013-05-08 18:45:14 +04:00
wayChan <- batch
waitFill.Done()
}()
}
waitFill.Wait()
2013-05-08 18:45:14 +04:00
close(wayChan)
waitDb.Wait()
2013-05-13 11:54:00 +04:00
diffCache.Coords.Close()
}
2013-05-10 15:55:14 +04:00
progress.Stop()
2013-04-08 23:45:13 +04:00
//parser.PBFStats(os.Args[1])
2012-12-26 17:07:30 +04:00
}