run generalization and index creation in parallel

master
Oliver Tonnhofer 2013-06-20 13:44:58 +02:00
parent a13ca37fde
commit 1ad73517bb
2 changed files with 147 additions and 47 deletions

View File

@ -8,6 +8,7 @@ import (
"goposm/database"
"goposm/logging"
"goposm/mapping"
"runtime"
"strings"
"sync"
)
@ -199,56 +200,81 @@ func (pg *PostGIS) Finish() error {
}
defer rollbackIfTx(&tx)
for tableName, table := range pg.Tables {
worker := int(runtime.NumCPU() / 2)
if worker > 1 {
worker = 1
}
p := newWorkerPool(worker, len(pg.Tables))
for tableName, tbl := range pg.Tables {
tableName := pg.Prefix + tableName
for _, col := range table.Columns {
if col.Type.Name() == "GEOMETRY" {
sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
}
}
if col.FieldType.Name == "id" {
sql := fmt.Sprintf(`CREATE INDEX "%s_osm_id_idx" ON "%s"."%s" USING BTREE ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating OSM id index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
table := tbl
p.in <- func() error {
for _, col := range table.Columns {
if col.Type.Name() == "GEOMETRY" {
sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
}
}
if col.FieldType.Name == "id" {
sql := fmt.Sprintf(`CREATE INDEX "%s_osm_id_idx" ON "%s"."%s" USING BTREE ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating OSM id index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
}
}
}
return nil
}
}
for tableName, table := range pg.GeneralizedTables {
err = p.wait()
if err != nil {
return err
}
p = newWorkerPool(worker, len(pg.GeneralizedTables))
for tableName, tbl := range pg.GeneralizedTables {
tableName := pg.Prefix + tableName
for _, col := range table.Source.Columns {
if col.Type.Name() == "GEOMETRY" {
sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
}
}
if col.FieldType.Name == "id" {
sql := fmt.Sprintf(`CREATE INDEX "%s_osm_id_idx" ON "%s"."%s" USING BTREE ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating OSM id index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
table := tbl
p.in <- func() error {
for _, col := range table.Source.Columns {
if col.Type.Name() == "GEOMETRY" {
sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
}
}
if col.FieldType.Name == "id" {
sql := fmt.Sprintf(`CREATE INDEX "%s_osm_id_idx" ON "%s"."%s" USING BTREE ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating OSM id index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
}
}
}
return nil
}
}
err = p.wait()
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
@ -286,30 +312,54 @@ func (pg *PostGIS) checkGeneralizedTableSources() {
func (pg *PostGIS) Generalize() error {
defer log.StopStep(log.StartStep(fmt.Sprintf("Creating generalized tables")))
worker := int(runtime.NumCPU() / 2)
if worker > 1 {
worker = 1
}
// generalized tables can depend on other generalized tables
// create tables with non-generalized sources first
p := newWorkerPool(worker, len(pg.GeneralizedTables))
for _, table := range pg.GeneralizedTables {
if table.SourceGeneralized == nil {
if err := pg.generalizeTable(table); err != nil {
return err
tbl := table // for following closure
p.in <- func() error {
if err := pg.generalizeTable(tbl); err != nil {
return err
}
tbl.created = true
return nil
}
table.created = true
}
}
err := p.wait()
if err != nil {
return err
}
// next create tables with created generalized sources until
// no new source is created
created := true
for created {
created = false
p := newWorkerPool(worker, len(pg.GeneralizedTables))
for _, table := range pg.GeneralizedTables {
if !table.created && table.SourceGeneralized.created {
if err := pg.generalizeTable(table); err != nil {
return err
tbl := table // for following closure
p.in <- func() error {
if err := pg.generalizeTable(tbl); err != nil {
return err
}
tbl.created = true
created = true
return nil
}
table.created = true
created = true
}
}
err := p.wait()
if err != nil {
return err
}
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"strings"
"sync"
)
func schemasFromConnectionParams(params string) (string, string) {
@ -68,3 +69,52 @@ func rollbackIfTx(tx **sql.Tx) {
}
}
}
// workerPool runs functions in n (worker) parallel goroutines.
// wait will return the first error or nil when all functions
// returned succesfull.
type workerPool struct {
in chan func() error
out chan error
wg *sync.WaitGroup
}
func newWorkerPool(worker, tasks int) *workerPool {
p := &workerPool{
make(chan func() error, tasks),
make(chan error, tasks),
&sync.WaitGroup{},
}
for i := 0; i < worker; i++ {
p.wg.Add(1)
go p.workerLoop()
}
return p
}
func (p *workerPool) workerLoop() {
for f := range p.in {
p.out <- f()
}
p.wg.Done()
}
func (p *workerPool) wait() error {
close(p.in)
done := make(chan bool)
go func() {
p.wg.Wait()
done <- true
}()
for {
select {
case err := <-p.out:
if err != nil {
return err
}
case <-done:
return nil
}
}
}