From b3d1dbdbd0a253fd0c1ce3ae31670c55a4ebd965 Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Tue, 28 May 2013 14:07:06 +0200 Subject: [PATCH] log steps --- database/postgis/postgis.go | 28 +++++++++++---- goposm.go | 6 ++-- logging/logger.go | 72 ++++++++++++++++++++++++++----------- stats/stats.go | 40 ++++++++++++++------- writer/relations.go | 2 +- writer/ways.go | 4 +-- 6 files changed, 108 insertions(+), 44 deletions(-) diff --git a/database/postgis/postgis.go b/database/postgis/postgis.go index 7ce8e31..40ef993 100644 --- a/database/postgis/postgis.go +++ b/database/postgis/postgis.go @@ -6,11 +6,13 @@ import ( "fmt" "github.com/bmizerany/pq" "goposm/database" + "goposm/logging" "goposm/mapping" - "log" "strings" ) +var log = logging.NewLogger("PostGIS") + type ColumnSpec struct { Name string Type ColumnType @@ -87,7 +89,7 @@ func NewTableSpec(pg *PostGIS, t *mapping.Table) *TableSpec { for _, field := range t.Fields { pgType, ok := pgTypes[field.Type] if !ok { - log.Println("unhandled", field) + log.Errorf("unhandled field %v", field) pgType = pgTypes["string"] } col := ColumnSpec{field.Name, pgType} @@ -325,6 +327,8 @@ func dropTableIfExists(tx *sql.Tx, schema, table string) error { } func (pg *PostGIS) rotate(source, dest, backup string) error { + defer log.StopStep(log.StartStep(fmt.Sprintf("Rotating tables"))) + if err := pg.createSchema(backup); err != nil { return err } @@ -338,7 +342,7 @@ func (pg *PostGIS) rotate(source, dest, backup string) error { for _, tableName := range pg.TableNames() { tableName = pg.Prefix + tableName - log.Printf("rotating %s from %s -> %s -> %s\n", tableName, source, dest, backup) + log.Printf("Rotating %s from %s -> %s -> %s", tableName, source, dest, backup) backupExists, err := tableExists(tx, backup, tableName) if err != nil { @@ -354,7 +358,7 @@ func (pg *PostGIS) rotate(source, dest, backup string) error { } if !sourceExists { - log.Printf("skipping rotate of %s, table does not exists in %s", tableName, source) + log.Warnf("skipping rotate of %s, table does not exists in %s", tableName, source) continue } @@ -399,7 +403,7 @@ func (pg *PostGIS) RevertDeploy() error { func rollbackIfTx(tx **sql.Tx) { if *tx != nil { if err := tx.Rollback(); err != nil { - log.Println("rollback failed", err) + log.Fatal("rollback failed", err) } } } @@ -440,6 +444,8 @@ func (pg *PostGIS) RemoveBackup() error { // Finish creates spatial indices on all tables. func (pg *PostGIS) Finish() error { + defer log.StopStep(log.StartStep(fmt.Sprintf("Creating geometry indices"))) + tx, err := pg.Db.Begin() if err != nil { return err @@ -452,7 +458,9 @@ func (pg *PostGIS) Finish() error { if col.Type.Name() == "GEOMETRY" { sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`, tableName, pg.Schema, tableName, col.Name) + step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName)) _, err := tx.Exec(sql) + log.StopStep(step) if err != nil { return err } @@ -465,7 +473,9 @@ func (pg *PostGIS) Finish() error { if col.Type.Name() == "GEOMETRY" { sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`, tableName, pg.Schema, tableName, col.Name) + step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName)) _, err := tx.Exec(sql) + log.StopStep(step) if err != nil { return err } @@ -507,7 +517,8 @@ func (pg *PostGIS) checkGeneralizedTableSources() { } func (pg *PostGIS) Generalize() error { - fmt.Println("generalizing") + defer log.StopStep(log.StartStep(fmt.Sprintf("Creating generalized tables"))) + // generalized tables can depend on other generalized tables // create tables with non-generalized sources first for _, table := range pg.GeneralizedTables { @@ -537,6 +548,9 @@ func (pg *PostGIS) Generalize() error { } func (pg *PostGIS) generalizeTable(table *GeneralizedTableSpec) error { + defer log.StopStep(log.StartStep(fmt.Sprintf("Generalizing %s into %s", + pg.Prefix+table.SourceName, pg.Prefix+table.Name))) + tx, err := pg.Db.Begin() if err != nil { return err @@ -561,7 +575,7 @@ func (pg *PostGIS) generalizeTable(table *GeneralizedTableSpec) error { sql := fmt.Sprintf(`CREATE TABLE "%s"."%s" AS (SELECT %s FROM "%s"."%s"%s)`, pg.Schema, table.Name, columnSQL, pg.Schema, pg.Prefix+table.SourceName, where) - fmt.Println(sql) + _, err = tx.Exec(sql) if err != nil { return err diff --git a/goposm.go b/goposm.go index e957e97..7405b77 100644 --- a/goposm.go +++ b/goposm.go @@ -153,16 +153,17 @@ func main() { } if *read != "" { + progress.Start() osmCache.Coords.SetLinearImport(true) reader.ReadPbf(osmCache, progress, tagmapping, *read) osmCache.Coords.SetLinearImport(false) - progress.Reset() + progress.Stop() osmCache.Coords.Flush() } if *write { if true { - progress.Reset() + progress.Start() err = db.Init() if err != nil { die(err) @@ -210,6 +211,7 @@ func main() { nodeWriter.Close() insertBuffer.Close() dbWriter.Close() + progress.Stop() } if db, ok := db.(database.Generalizer); ok { diff --git a/logging/logger.go b/logging/logger.go index 354afc3..cf70737 100644 --- a/logging/logger.go +++ b/logging/logger.go @@ -70,42 +70,61 @@ func (l *Logger) Errorf(msg string, args ...interface{}) { defaultLogBroker.Records <- Record{ERROR, l.Component, fmt.Sprintf(msg, args...)} } +func (l *Logger) Warnf(msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{WARNING, l.Component, fmt.Sprintf(msg, args...)} +} + func (l *Logger) Printfl(level Level, msg string, args ...interface{}) { defaultLogBroker.Records <- Record{level, l.Component, fmt.Sprintf(msg, args...)} } +func (l *Logger) StartStep(msg string) string { + defaultLogBroker.StepStart <- Step{l.Component, msg} + return msg +} + +func (l *Logger) StopStep(msg string) { + defaultLogBroker.StepStop <- Step{l.Component, msg} +} + func NewLogger(component string) *Logger { return &Logger{component} } +type Step struct { + Component string + Name string +} + type LogBroker struct { - Records chan Record - Progress chan string - quit chan bool - wg *sync.WaitGroup + Records chan Record + Progress chan string + StepStart chan Step + StepStop chan Step + quit chan bool + wg *sync.WaitGroup + newline bool + lastProgress string } func (l *LogBroker) loop() { l.wg.Add(1) - newline := true - lastProgress := "" + steps := make(map[Step]time.Time) For: for { select { case record := <-l.Records: - if !newline { - fmt.Print(CLEARLINE) - } l.printRecord(record) - newline = true - if lastProgress != "" { - l.printProgress(lastProgress) - newline = false - } case progress := <-l.Progress: l.printProgress(progress) - lastProgress = progress - newline = false + case step := <-l.StepStart: + steps[step] = time.Now() + l.printProgress(step.Name) + case step := <-l.StepStop: + startTime := steps[step] + delete(steps, step) + duration := time.Since(startTime) + l.printRecord(Record{INFO, step.Component, step.Name + " took: " + duration.String()}) case <-l.quit: break For } @@ -123,13 +142,24 @@ func (l *LogBroker) printComponent(component string) { } func (l *LogBroker) printRecord(record Record) { + if !l.newline { + fmt.Print(CLEARLINE) + } l.printPrefix() l.printComponent(record.Component) fmt.Println(record.Message) + l.newline = true + if l.lastProgress != "" { + l.printProgress(l.lastProgress) + l.newline = false + } } func (l *LogBroker) printProgress(progress string) { l.printPrefix() fmt.Print(progress) + fmt.Print("\r") + l.lastProgress = progress + l.newline = false } func Shutdown() { @@ -141,10 +171,12 @@ var defaultLogBroker LogBroker func init() { defaultLogBroker = LogBroker{ - Records: make(chan Record, 8), - Progress: make(chan string), - quit: make(chan bool), - wg: &sync.WaitGroup{}, + Records: make(chan Record, 8), + Progress: make(chan string), + StepStart: make(chan Step), + StepStop: make(chan Step), + quit: make(chan bool), + wg: &sync.WaitGroup{}, } go defaultLogBroker.loop() } diff --git a/stats/stats.go b/stats/stats.go index bf76f34..a33c07e 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -65,7 +65,6 @@ func (c *counter) Tick() { // Duration returns the duration since start with seconds precission. func (c *counter) Duration() time.Duration { return time.Duration(int64(time.Since(c.start).Seconds()) * 1000 * 1000 * 1000) - } type Statistics struct { @@ -73,16 +72,25 @@ type Statistics struct { nodes chan int ways chan int relations chan int - reset chan bool + status chan int messages chan string } +const ( + RESET = iota + START + STOP + QUIT +) + func (s *Statistics) AddCoords(n int) { s.coords <- n } func (s *Statistics) AddNodes(n int) { s.nodes <- n } func (s *Statistics) AddWays(n int) { s.ways <- n } func (s *Statistics) AddRelations(n int) { s.relations <- n } -func (s *Statistics) Reset() { s.reset <- true } -func (s *Statistics) Stop() { s.reset <- false } +func (s *Statistics) Reset() { s.status <- RESET } +func (s *Statistics) Stop() { s.status <- STOP } +func (s *Statistics) Start() { s.status <- START } +func (s *Statistics) Quit() { s.status <- QUIT } func (s *Statistics) Message(msg string) { s.messages <- msg } func StatsReporter() *Statistics { @@ -93,12 +101,11 @@ func StatsReporter() *Statistics { s.nodes = make(chan int) s.ways = make(chan int) s.relations = make(chan int) - s.reset = make(chan bool) + s.status = make(chan int) s.messages = make(chan string) go func() { - tick := time.Tick(500 * time.Millisecond) - tock := time.Tick(time.Minute) + var tick, tock <-chan time.Time for { select { case n := <-s.coords: @@ -109,15 +116,24 @@ func StatsReporter() *Statistics { c.ways.Add(n) case n := <-s.relations: c.relations.Add(n) - case v := <-s.reset: - if v { + case v := <-s.status: + switch v { + case RESET: c.PrintStats() c = counter{} c.start = time.Now() - } else { - // stop + case QUIT: c.PrintStats() return + case STOP: + tick = nil + tock = nil + case START: + c.PrintStats() + c = counter{} + c.start = time.Now() + tick = time.Tick(500 * time.Millisecond) + tock = time.Tick(time.Minute) } case msg := <-s.messages: c.PrintTick() @@ -135,7 +151,7 @@ func StatsReporter() *Statistics { func (c *counter) PrintTick() { logging.Progress( - fmt.Sprintf("[%6s] C: %7d/s %7d/s (%10d) N: %7d/s %7d/s (%9d) W: %7d/s %7d/s (%8d) R: %6d/s %6d/s (%7d)\r", + fmt.Sprintf("[%6s] C: %7d/s %7d/s (%10d) N: %7d/s %7d/s (%9d) W: %7d/s %7d/s (%8d) R: %6d/s %6d/s (%7d)", c.Duration(), c.coords.Rps(1000), c.coords.LastRps(1000), diff --git a/writer/relations.go b/writer/relations.go index ef35d2d..7443486 100644 --- a/writer/relations.go +++ b/writer/relations.go @@ -83,7 +83,7 @@ func (rw *RelationWriter) loop() { for _, g := range parts { rel := element.Relation(*r) rel.Geom = &element.Geometry{g, geos.AsWkb(g)} - rw.insertMatches(&r.OSMElem, matches) + rw.insertMatches(&rel.OSMElem, matches) } } else { rw.insertMatches(&r.OSMElem, matches) diff --git a/writer/ways.go b/writer/ways.go index d4a6895..5f080fa 100644 --- a/writer/ways.go +++ b/writer/ways.go @@ -97,9 +97,9 @@ func (ww *WayWriter) buildAndInsert(geos *geos.Geos, w *element.Way, matches []m for _, g := range parts { way := element.Way(*w) way.Geom = &element.Geometry{g, geos.AsWkb(g)} - ww.insertMatches(&w.OSMElem, matches) + ww.insertMatches(&way.OSMElem, matches) } } else { - ww.insertMatches(&w.OSMElem, matches) + ww.insertMatches(&way.OSMElem, matches) } }