log steps

master
Oliver Tonnhofer 2013-05-28 14:07:06 +02:00
parent ac3810e17c
commit b3d1dbdbd0
6 changed files with 108 additions and 44 deletions

View File

@ -6,11 +6,13 @@ import (
"fmt"
"github.com/bmizerany/pq"
"goposm/database"
"goposm/logging"
"goposm/mapping"
"log"
"strings"
)
var log = logging.NewLogger("PostGIS")
type ColumnSpec struct {
Name string
Type ColumnType
@ -87,7 +89,7 @@ func NewTableSpec(pg *PostGIS, t *mapping.Table) *TableSpec {
for _, field := range t.Fields {
pgType, ok := pgTypes[field.Type]
if !ok {
log.Println("unhandled", field)
log.Errorf("unhandled field %v", field)
pgType = pgTypes["string"]
}
col := ColumnSpec{field.Name, pgType}
@ -325,6 +327,8 @@ func dropTableIfExists(tx *sql.Tx, schema, table string) error {
}
func (pg *PostGIS) rotate(source, dest, backup string) error {
defer log.StopStep(log.StartStep(fmt.Sprintf("Rotating tables")))
if err := pg.createSchema(backup); err != nil {
return err
}
@ -338,7 +342,7 @@ func (pg *PostGIS) rotate(source, dest, backup string) error {
for _, tableName := range pg.TableNames() {
tableName = pg.Prefix + tableName
log.Printf("rotating %s from %s -> %s -> %s\n", tableName, source, dest, backup)
log.Printf("Rotating %s from %s -> %s -> %s", tableName, source, dest, backup)
backupExists, err := tableExists(tx, backup, tableName)
if err != nil {
@ -354,7 +358,7 @@ func (pg *PostGIS) rotate(source, dest, backup string) error {
}
if !sourceExists {
log.Printf("skipping rotate of %s, table does not exists in %s", tableName, source)
log.Warnf("skipping rotate of %s, table does not exists in %s", tableName, source)
continue
}
@ -399,7 +403,7 @@ func (pg *PostGIS) RevertDeploy() error {
func rollbackIfTx(tx **sql.Tx) {
if *tx != nil {
if err := tx.Rollback(); err != nil {
log.Println("rollback failed", err)
log.Fatal("rollback failed", err)
}
}
}
@ -440,6 +444,8 @@ func (pg *PostGIS) RemoveBackup() error {
// Finish creates spatial indices on all tables.
func (pg *PostGIS) Finish() error {
defer log.StopStep(log.StartStep(fmt.Sprintf("Creating geometry indices")))
tx, err := pg.Db.Begin()
if err != nil {
return err
@ -452,7 +458,9 @@ func (pg *PostGIS) Finish() error {
if col.Type.Name() == "GEOMETRY" {
sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
}
@ -465,7 +473,9 @@ func (pg *PostGIS) Finish() error {
if col.Type.Name() == "GEOMETRY" {
sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`,
tableName, pg.Schema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName))
_, err := tx.Exec(sql)
log.StopStep(step)
if err != nil {
return err
}
@ -507,7 +517,8 @@ func (pg *PostGIS) checkGeneralizedTableSources() {
}
func (pg *PostGIS) Generalize() error {
fmt.Println("generalizing")
defer log.StopStep(log.StartStep(fmt.Sprintf("Creating generalized tables")))
// generalized tables can depend on other generalized tables
// create tables with non-generalized sources first
for _, table := range pg.GeneralizedTables {
@ -537,6 +548,9 @@ func (pg *PostGIS) Generalize() error {
}
func (pg *PostGIS) generalizeTable(table *GeneralizedTableSpec) error {
defer log.StopStep(log.StartStep(fmt.Sprintf("Generalizing %s into %s",
pg.Prefix+table.SourceName, pg.Prefix+table.Name)))
tx, err := pg.Db.Begin()
if err != nil {
return err
@ -561,7 +575,7 @@ func (pg *PostGIS) generalizeTable(table *GeneralizedTableSpec) error {
sql := fmt.Sprintf(`CREATE TABLE "%s"."%s" AS (SELECT %s FROM "%s"."%s"%s)`,
pg.Schema, table.Name, columnSQL, pg.Schema,
pg.Prefix+table.SourceName, where)
fmt.Println(sql)
_, err = tx.Exec(sql)
if err != nil {
return err

View File

@ -153,16 +153,17 @@ func main() {
}
if *read != "" {
progress.Start()
osmCache.Coords.SetLinearImport(true)
reader.ReadPbf(osmCache, progress, tagmapping, *read)
osmCache.Coords.SetLinearImport(false)
progress.Reset()
progress.Stop()
osmCache.Coords.Flush()
}
if *write {
if true {
progress.Reset()
progress.Start()
err = db.Init()
if err != nil {
die(err)
@ -210,6 +211,7 @@ func main() {
nodeWriter.Close()
insertBuffer.Close()
dbWriter.Close()
progress.Stop()
}
if db, ok := db.(database.Generalizer); ok {

View File

@ -70,42 +70,61 @@ func (l *Logger) Errorf(msg string, args ...interface{}) {
defaultLogBroker.Records <- Record{ERROR, l.Component, fmt.Sprintf(msg, args...)}
}
func (l *Logger) Warnf(msg string, args ...interface{}) {
defaultLogBroker.Records <- Record{WARNING, l.Component, fmt.Sprintf(msg, args...)}
}
func (l *Logger) Printfl(level Level, msg string, args ...interface{}) {
defaultLogBroker.Records <- Record{level, l.Component, fmt.Sprintf(msg, args...)}
}
func (l *Logger) StartStep(msg string) string {
defaultLogBroker.StepStart <- Step{l.Component, msg}
return msg
}
func (l *Logger) StopStep(msg string) {
defaultLogBroker.StepStop <- Step{l.Component, msg}
}
func NewLogger(component string) *Logger {
return &Logger{component}
}
type Step struct {
Component string
Name string
}
type LogBroker struct {
Records chan Record
Progress chan string
quit chan bool
wg *sync.WaitGroup
Records chan Record
Progress chan string
StepStart chan Step
StepStop chan Step
quit chan bool
wg *sync.WaitGroup
newline bool
lastProgress string
}
func (l *LogBroker) loop() {
l.wg.Add(1)
newline := true
lastProgress := ""
steps := make(map[Step]time.Time)
For:
for {
select {
case record := <-l.Records:
if !newline {
fmt.Print(CLEARLINE)
}
l.printRecord(record)
newline = true
if lastProgress != "" {
l.printProgress(lastProgress)
newline = false
}
case progress := <-l.Progress:
l.printProgress(progress)
lastProgress = progress
newline = false
case step := <-l.StepStart:
steps[step] = time.Now()
l.printProgress(step.Name)
case step := <-l.StepStop:
startTime := steps[step]
delete(steps, step)
duration := time.Since(startTime)
l.printRecord(Record{INFO, step.Component, step.Name + " took: " + duration.String()})
case <-l.quit:
break For
}
@ -123,13 +142,24 @@ func (l *LogBroker) printComponent(component string) {
}
func (l *LogBroker) printRecord(record Record) {
if !l.newline {
fmt.Print(CLEARLINE)
}
l.printPrefix()
l.printComponent(record.Component)
fmt.Println(record.Message)
l.newline = true
if l.lastProgress != "" {
l.printProgress(l.lastProgress)
l.newline = false
}
}
func (l *LogBroker) printProgress(progress string) {
l.printPrefix()
fmt.Print(progress)
fmt.Print("\r")
l.lastProgress = progress
l.newline = false
}
func Shutdown() {
@ -141,10 +171,12 @@ var defaultLogBroker LogBroker
func init() {
defaultLogBroker = LogBroker{
Records: make(chan Record, 8),
Progress: make(chan string),
quit: make(chan bool),
wg: &sync.WaitGroup{},
Records: make(chan Record, 8),
Progress: make(chan string),
StepStart: make(chan Step),
StepStop: make(chan Step),
quit: make(chan bool),
wg: &sync.WaitGroup{},
}
go defaultLogBroker.loop()
}

View File

@ -65,7 +65,6 @@ func (c *counter) Tick() {
// Duration returns the duration since start with seconds precission.
func (c *counter) Duration() time.Duration {
return time.Duration(int64(time.Since(c.start).Seconds()) * 1000 * 1000 * 1000)
}
type Statistics struct {
@ -73,16 +72,25 @@ type Statistics struct {
nodes chan int
ways chan int
relations chan int
reset chan bool
status chan int
messages chan string
}
const (
RESET = iota
START
STOP
QUIT
)
func (s *Statistics) AddCoords(n int) { s.coords <- n }
func (s *Statistics) AddNodes(n int) { s.nodes <- n }
func (s *Statistics) AddWays(n int) { s.ways <- n }
func (s *Statistics) AddRelations(n int) { s.relations <- n }
func (s *Statistics) Reset() { s.reset <- true }
func (s *Statistics) Stop() { s.reset <- false }
func (s *Statistics) Reset() { s.status <- RESET }
func (s *Statistics) Stop() { s.status <- STOP }
func (s *Statistics) Start() { s.status <- START }
func (s *Statistics) Quit() { s.status <- QUIT }
func (s *Statistics) Message(msg string) { s.messages <- msg }
func StatsReporter() *Statistics {
@ -93,12 +101,11 @@ func StatsReporter() *Statistics {
s.nodes = make(chan int)
s.ways = make(chan int)
s.relations = make(chan int)
s.reset = make(chan bool)
s.status = make(chan int)
s.messages = make(chan string)
go func() {
tick := time.Tick(500 * time.Millisecond)
tock := time.Tick(time.Minute)
var tick, tock <-chan time.Time
for {
select {
case n := <-s.coords:
@ -109,15 +116,24 @@ func StatsReporter() *Statistics {
c.ways.Add(n)
case n := <-s.relations:
c.relations.Add(n)
case v := <-s.reset:
if v {
case v := <-s.status:
switch v {
case RESET:
c.PrintStats()
c = counter{}
c.start = time.Now()
} else {
// stop
case QUIT:
c.PrintStats()
return
case STOP:
tick = nil
tock = nil
case START:
c.PrintStats()
c = counter{}
c.start = time.Now()
tick = time.Tick(500 * time.Millisecond)
tock = time.Tick(time.Minute)
}
case msg := <-s.messages:
c.PrintTick()
@ -135,7 +151,7 @@ func StatsReporter() *Statistics {
func (c *counter) PrintTick() {
logging.Progress(
fmt.Sprintf("[%6s] C: %7d/s %7d/s (%10d) N: %7d/s %7d/s (%9d) W: %7d/s %7d/s (%8d) R: %6d/s %6d/s (%7d)\r",
fmt.Sprintf("[%6s] C: %7d/s %7d/s (%10d) N: %7d/s %7d/s (%9d) W: %7d/s %7d/s (%8d) R: %6d/s %6d/s (%7d)",
c.Duration(),
c.coords.Rps(1000),
c.coords.LastRps(1000),

View File

@ -83,7 +83,7 @@ func (rw *RelationWriter) loop() {
for _, g := range parts {
rel := element.Relation(*r)
rel.Geom = &element.Geometry{g, geos.AsWkb(g)}
rw.insertMatches(&r.OSMElem, matches)
rw.insertMatches(&rel.OSMElem, matches)
}
} else {
rw.insertMatches(&r.OSMElem, matches)

View File

@ -97,9 +97,9 @@ func (ww *WayWriter) buildAndInsert(geos *geos.Geos, w *element.Way, matches []m
for _, g := range parts {
way := element.Way(*w)
way.Geom = &element.Geometry{g, geos.AsWkb(g)}
ww.insertMatches(&w.OSMElem, matches)
ww.insertMatches(&way.OSMElem, matches)
}
} else {
ww.insertMatches(&w.OSMElem, matches)
ww.insertMatches(&way.OSMElem, matches)
}
}