From 07f4009033dbb390f46108d74878035b5397965b Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Mon, 29 Jul 2013 15:47:09 +0200 Subject: [PATCH] refactored progress statistics (display progress in percent) --- diff/process.go | 5 +- goposm.go | 11 +- reader/reader.go | 12 ++- stats/stats.go | 267 ++++++++++++++++++++++------------------------- writer/ways.go | 2 +- 5 files changed, 142 insertions(+), 155 deletions(-) diff --git a/diff/process.go b/diff/process.go index 71b6e42..99c07e1 100644 --- a/diff/process.go +++ b/diff/process.go @@ -87,7 +87,7 @@ func Update(oscFile string, force bool) { tagmapping.PolygonMatcher(), ) - progress := stats.StatsReporter() + progress := stats.NewStatsReporter() var geometryClipper *clipper.Clipper @@ -128,7 +128,6 @@ func Update(oscFile string, force bool) { step := log.StartStep("Parsing changes, updating cache and removing elements") - progress.Start() For: for { select { @@ -196,7 +195,7 @@ For: log.StopStep(step) step = log.StartStep("Writing added/modified elements") - progress.Start() + progress = stats.NewStatsReporter() for nodeId, _ := range nodeIds { node, err := osmCache.Nodes.GetNode(nodeId) diff --git a/goposm.go b/goposm.go index 5e2fc24..44101df 100644 --- a/goposm.go +++ b/goposm.go @@ -135,8 +135,6 @@ func mainimport() { log.StopStep(step) } - progress := stats.StatsReporter() - tagmapping, err := mapping.NewMapping(config.ImportOptions.Base.MappingFile) if err != nil { log.Fatal("mapping file: ", err) @@ -173,13 +171,15 @@ func mainimport() { step := log.StartStep("Imposm") + var elementCounts *stats.ElementCounts + if config.ImportOptions.Read != "" { step := log.StartStep("Reading OSM data") err = osmCache.Open() if err != nil { log.Fatal(err) } - progress.Start() + progress := stats.NewStatsReporter() pbfFile, err := pbf.Open(config.ImportOptions.Read) if err != nil { @@ -189,7 +189,7 @@ func mainimport() { osmCache.Coords.SetLinearImport(true) reader.ReadPbf(osmCache, progress, tagmapping, pbfFile) osmCache.Coords.SetLinearImport(false) - progress.Stop() + elementCounts = progress.Stop() osmCache.Close() log.StopStep(step) if config.ImportOptions.Diff { @@ -203,7 +203,8 @@ func mainimport() { if config.ImportOptions.Write { stepImport := log.StartStep("Importing OSM data") stepWrite := log.StartStep("Writing OSM data") - progress.Start() + progress := stats.NewStatsReporterWithEstimate(elementCounts) + err = db.Init() if err != nil { log.Fatal(err) diff --git a/reader/reader.go b/reader/reader.go index f7033cb..bdad1d3 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -98,11 +98,15 @@ func ReadPbf(cache *cache.OSMCache, progress *stats.Statistics, tagmapping *mapp go func() { m := tagmapping.RelationTagFilter() for rels := range relations { + numWithTags := 0 for i, _ := range rels { m.Filter(&rels[i].Tags) + if len(rels[i].Tags) > 0 { + numWithTags += 1 + } } cache.Relations.PutRelations(rels) - progress.AddRelations(len(rels)) + progress.AddRelations(numWithTags) } waitWriter.Done() }() @@ -127,11 +131,15 @@ func ReadPbf(cache *cache.OSMCache, progress *stats.Statistics, tagmapping *mapp go func() { m := tagmapping.NodeTagFilter() for nds := range nodes { + numWithTags := 0 for i, _ := range nds { m.Filter(&nds[i].Tags) + if len(nds[i].Tags) > 0 { + numWithTags += 1 + } } cache.Nodes.PutNodes(nds) - progress.AddNodes(len(nds)) + progress.AddNodes(numWithTags) } waitWriter.Done() }() diff --git a/stats/stats.go b/stats/stats.go index c474b79..aea3f33 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -6,76 +6,62 @@ import ( "time" ) -type RpsCounter struct { - counter int64 - lastAdd int64 - start time.Time - stop time.Time - updated bool -} - -func (r *RpsCounter) Add(n int) { - r.counter += int64(n) - r.lastAdd += int64(n) - if n > 0 { - if r.start.IsZero() { - r.start = time.Now() - } - r.updated = true - } -} - -func (r *RpsCounter) Value() int64 { - return r.counter -} - -func (r *RpsCounter) Rps() float64 { - return float64(r.counter) / float64(r.stop.Sub(r.start).Seconds()) -} - -func (r *RpsCounter) LastRps() float64 { - return float64(r.lastAdd) / float64(time.Since(r.stop).Seconds()) -} - -func (r *RpsCounter) Tick() { - if r.updated { - r.stop = time.Now() - r.updated = false - } - r.lastAdd = 0 -} - -func roundInt(val float64, round int) int64 { - return int64(val/float64(round)) * int64(round) -} - -type counter struct { +type Counter struct { start time.Time - coords RpsCounter - nodes RpsCounter - ways RpsCounter - relations RpsCounter + Coords *RpsCounter + Nodes *RpsCounter + Ways *RpsCounter + Relations *RpsCounter } -func (c *counter) Tick() { - c.coords.Tick() - c.nodes.Tick() - c.ways.Tick() - c.relations.Tick() +func (c *Counter) Tick() { + c.Coords.Tick() + c.Nodes.Tick() + c.Ways.Tick() + c.Relations.Tick() +} + +func NewCounter() *Counter { + return &Counter{ + start: time.Now(), + Coords: NewRpsCounter(), + Nodes: NewRpsCounter(), + Ways: NewRpsCounter(), + Relations: NewRpsCounter(), + } +} + +func NewCounterWithEstimate(counts ElementCounts) *Counter { + counter := NewCounter() + counter.Coords.total = counts.Coords.Current + counter.Nodes.total = counts.Nodes.Current + counter.Ways.total = counts.Ways.Current + counter.Relations.total = counts.Relations.Current + return counter +} + +type ElementCounts struct { + Coords, Nodes, Ways, Relations ElementCount } // Duration returns the duration since start with seconds precission. -func (c *counter) Duration() time.Duration { +func (c *Counter) CurrentCount() *ElementCounts { + return &ElementCounts{ + Coords: c.Coords.Count(), + Nodes: c.Nodes.Count(), + Ways: c.Ways.Count(), + Relations: c.Relations.Count(), + } +} + +// Duration returns the duration since start with seconds precission. +func (c *Counter) Duration() time.Duration { return time.Duration(int64(time.Since(c.start).Seconds()) * 1000 * 1000 * 1000) } type Statistics struct { - coords chan int - nodes chan int - ways chan int - relations chan int - status chan int - messages chan string + counter *Counter + done chan bool } const ( @@ -85,101 +71,94 @@ const ( QUIT ) -func (s *Statistics) AddCoords(n int) { s.coords <- n } -func (s *Statistics) AddNodes(n int) { s.nodes <- n } -func (s *Statistics) AddWays(n int) { s.ways <- n } -func (s *Statistics) AddRelations(n int) { s.relations <- n } -func (s *Statistics) Reset() { s.status <- RESET } -func (s *Statistics) Stop() { s.status <- STOP } -func (s *Statistics) Start() { s.status <- START } -func (s *Statistics) Quit() { s.status <- QUIT } -func (s *Statistics) Message(msg string) { s.messages <- msg } +func (s *Statistics) AddCoords(n int) { s.counter.Coords.Add(n) } +func (s *Statistics) AddNodes(n int) { s.counter.Nodes.Add(n) } +func (s *Statistics) AddWays(n int) { s.counter.Ways.Add(n) } +func (s *Statistics) AddRelations(n int) { s.counter.Relations.Add(n) } +func (s *Statistics) Stop() *ElementCounts { + s.done <- true + return s.counter.CurrentCount() +} -func StatsReporter() *Statistics { - c := counter{} - c.start = time.Now() +func NewStatsReporter() *Statistics { s := Statistics{} - s.coords = make(chan int) - s.nodes = make(chan int) - s.ways = make(chan int) - s.relations = make(chan int) - s.status = make(chan int) - s.messages = make(chan string) + s.counter = NewCounter() + s.done = make(chan bool) - go func() { - var tick, tock <-chan time.Time - for { - select { - case n := <-s.coords: - c.coords.Add(n) - case n := <-s.nodes: - c.nodes.Add(n) - case n := <-s.ways: - c.ways.Add(n) - case n := <-s.relations: - c.relations.Add(n) - case v := <-s.status: - switch v { - case RESET: - c.PrintStats() - c = counter{} - c.start = time.Now() - case QUIT: - c.PrintStats() - return - case STOP: - tick = nil - tock = nil - c.PrintStats() - case START: - c = counter{} - c.start = time.Now() - tick = time.Tick(500 * time.Millisecond) - tock = time.Tick(time.Minute) - } - case msg := <-s.messages: - c.PrintTick() - fmt.Println("\n", msg) - case <-tock: - c.PrintStats() - case <-tick: - c.PrintTick() - c.Tick() - } - } - }() + go s.loop() return &s } -func (c *counter) PrintTick() { +func NewStatsReporterWithEstimate(counts *ElementCounts) *Statistics { + s := Statistics{} + if counts != nil { + s.counter = NewCounterWithEstimate(*counts) + } else { + s.counter = NewCounter() + } + s.done = make(chan bool) + + go s.loop() + return &s +} + +func (s *Statistics) loop() { + tick := time.Tick(500 * time.Millisecond) + tock := time.Tick(time.Minute) + for { + select { + case <-s.done: + s.counter.PrintStats() + return + case <-tock: + s.counter.PrintStats() + case <-tick: + s.counter.PrintTick() + s.counter.Tick() + } + } +} + +func fmtPercentOrVal(progress float64, value int64) string { + if progress == -1.0 { + return fmt.Sprintf("%d", value) + } + return fmt.Sprintf("%4.1f%%", progress*100) +} + +func roundInt(val float64, round int) int64 { + return int64(val/float64(round)) * int64(round) +} + +func (c *Counter) PrintTick() { logging.Progress( - fmt.Sprintf("[%6s] C: %7d/s %7d/s (%10d) N: %7d/s %7d/s (%9d) W: %7d/s %7d/s (%8d) R: %6d/s %6d/s (%7d)", + fmt.Sprintf("[%6s] C: %7d/s %7d/s (%s) N: %7d/s %7d/s (%s) W: %7d/s %7d/s (%s) R: %6d/s %6d/s (%s)", c.Duration(), - roundInt(c.coords.Rps(), 1000), - roundInt(c.coords.LastRps(), 1000), - c.coords.Value(), - roundInt(c.nodes.Rps(), 100), - roundInt(c.nodes.LastRps(), 100), - c.nodes.Value(), - roundInt(c.ways.Rps(), 100), - roundInt(c.ways.LastRps(), 100), - c.ways.Value(), - roundInt(c.relations.Rps(), 10), - roundInt(c.relations.LastRps(), 10), - c.relations.Value(), + roundInt(c.Coords.Rps(), 1000), + roundInt(c.Coords.LastRps(), 1000), + fmtPercentOrVal(c.Coords.Progress(), c.Coords.Value()), + roundInt(c.Nodes.Rps(), 100), + roundInt(c.Nodes.LastRps(), 100), + fmtPercentOrVal(c.Nodes.Progress(), c.Nodes.Value()), + roundInt(c.Ways.Rps(), 100), + roundInt(c.Ways.LastRps(), 100), + fmtPercentOrVal(c.Ways.Progress(), c.Ways.Value()), + roundInt(c.Relations.Rps(), 10), + roundInt(c.Relations.LastRps(), 10), + fmtPercentOrVal(c.Relations.Progress(), c.Relations.Value()), )) } -func (c *counter) PrintStats() { - logging.Infof("[%6s] C: %7d/s (%10d) N: %7d/s (%9d) W: %7d/s (%8d) R: %6d/s (%7d)", +func (c *Counter) PrintStats() { + logging.Infof("[%6s] C: %7d/s (%s) N: %7d/s (%s) W: %7d/s (%s) R: %6d/s (%s)", c.Duration(), - roundInt(c.coords.Rps(), 1000), - c.coords.Value(), - roundInt(c.nodes.Rps(), 100), - c.nodes.Value(), - roundInt(c.ways.Rps(), 100), - c.ways.Value(), - roundInt(c.relations.Rps(), 10), - c.relations.Value(), + roundInt(c.Coords.Rps(), 1000), + fmtPercentOrVal(c.Coords.Progress(), c.Coords.Value()), + roundInt(c.Nodes.Rps(), 100), + fmtPercentOrVal(c.Nodes.Progress(), c.Nodes.Value()), + roundInt(c.Ways.Rps(), 100), + fmtPercentOrVal(c.Ways.Progress(), c.Ways.Value()), + roundInt(c.Relations.Rps(), 10), + fmtPercentOrVal(c.Relations.Progress(), c.Relations.Value()), ) } diff --git a/writer/ways.go b/writer/ways.go index 8342936..a203d03 100644 --- a/writer/ways.go +++ b/writer/ways.go @@ -45,10 +45,10 @@ func (ww *WayWriter) loop() { geos.SetHandleSrid(ww.srid) defer geos.Finish() for w := range ww.ways { + ww.progress.AddWays(1) if len(w.Tags) == 0 { continue } - ww.progress.AddWays(1) inserted, err := ww.osmCache.InsertedWays.IsInserted(w.Id) if err != nil { log.Println(err)