From f320a919ab5dd1f091c0e6a903d18e0670b14281 Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Thu, 23 May 2013 17:53:58 +0200 Subject: [PATCH] first work on logging framework --- geom/geos/geos.go | 12 ++-- geom/geos/geos_wrap.go | 4 +- goposm.go | 65 ++++++++++------- logging/logger.go | 160 +++++++++++++++++++++++++++++++++++++++++ stats/stats.go | 38 +++++----- 5 files changed, 228 insertions(+), 51 deletions(-) create mode 100644 logging/logger.go diff --git a/geom/geos/geos.go b/geom/geos/geos.go index 8412025..7349fe0 100644 --- a/geom/geos/geos.go +++ b/geom/geos/geos.go @@ -5,7 +5,7 @@ package geos #include "geos_c.h" #include -extern void goDebug(char *msg); +extern void goLogString(char *msg); extern void debug_wrap(const char *fmt, ...); extern GEOSContextHandle_t initGEOS_r_debug(); extern void initGEOS_debug(); @@ -13,14 +13,16 @@ extern void initGEOS_debug(); import "C" import ( - "fmt" + "goposm/logging" "runtime" "unsafe" ) -//export goDebug -func goDebug(msg *C.char) { - fmt.Println(C.GoString(msg)) +var log = logging.NewLogger("GEOS") + +//export goLogString +func goLogString(msg *C.char) { + log.Printf(C.GoString(msg)) } type GEOS struct { diff --git a/geom/geos/geos_wrap.go b/geom/geos/geos_wrap.go index f0a1ef4..a828997 100644 --- a/geom/geos/geos_wrap.go +++ b/geom/geos/geos_wrap.go @@ -7,7 +7,7 @@ package geos #include #include -extern void goDebug(char *msg); +extern void goLogString(char *msg); void debug_wrap(const char *fmt, ...) { va_list a_list; @@ -16,7 +16,7 @@ void debug_wrap(const char *fmt, ...) { char buf[100]; vsnprintf(buf, sizeof(buf), fmt, a_list); va_end(a_list); - goDebug((char *)&buf); + goLogString((char *)&buf); } GEOSContextHandle_t initGEOS_r_debug() { diff --git a/goposm.go b/goposm.go index 0ea8cc2..66dd1ba 100644 --- a/goposm.go +++ b/goposm.go @@ -5,11 +5,12 @@ import ( "goposm/cache" "goposm/database" _ "goposm/database/postgis" + "goposm/logging" "goposm/mapping" "goposm/reader" "goposm/stats" "goposm/writer" - "log" + golog "log" "os" "runtime" "runtime/pprof" @@ -20,6 +21,8 @@ import ( var dbImportBatchSize int64 +var log = logging.NewLogger("") + func init() { dbImportBatchSize, _ = strconv.ParseInt( os.Getenv("GOPOSM_DBIMPORT_BATCHSIZE"), 10, 32) @@ -45,15 +48,27 @@ var ( removeBackup = flag.Bool("removebackup", false, "remove backups from deploy") ) +func die(args ...interface{}) { + log.Fatal(args...) + logging.Shutdown() + os.Exit(1) +} + +func dief(msg string, args ...interface{}) { + log.Fatalf(msg, args...) + logging.Shutdown() + os.Exit(1) +} + func main() { - log.SetFlags(log.LstdFlags | log.Lshortfile) + golog.SetFlags(golog.LstdFlags | golog.Lshortfile) runtime.GOMAXPROCS(runtime.NumCPU()) flag.Parse() if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { - log.Fatal(err) + golog.Fatal(err) } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() @@ -69,7 +84,7 @@ func main() { var err error interval, err = time.ParseDuration(parts[1]) if err != nil { - log.Fatal(err) + golog.Fatal(err) } } @@ -77,30 +92,30 @@ func main() { } if (*write || *read != "") && (*revertDeploy || *removeBackup) { - log.Fatal("-revertdeploy and -removebackup not compatible with -read/-write") + die("-revertdeploy and -removebackup not compatible with -read/-write") } if *revertDeploy && (*removeBackup || *deployProduction) { - log.Fatal("-revertdeploy not compatible with -deployproduction/-removebackup") + die("-revertdeploy not compatible with -deployproduction/-removebackup") } osmCache := cache.NewOSMCache(*cachedir) if *read != "" && osmCache.Exists() { if *overwritecache { - log.Println("removing existing cache", *cachedir) + log.Printf("removing existing cache %s", *cachedir) err := osmCache.Remove() if err != nil { - log.Fatal("unable to remove cache:", err) + die("unable to remove cache:", err) } } else if !*appendcache { - log.Fatal("cache already exists use -appendcache or -overwritecache") + die("cache already exists use -appendcache or -overwritecache") } } err := osmCache.Open() if err != nil { - log.Fatal(err) + die(err) } defer osmCache.Close() @@ -108,7 +123,7 @@ func main() { tagmapping, err := mapping.NewMapping(*mappingFile) if err != nil { - log.Fatal(err) + die(err) } var db database.DB @@ -122,7 +137,7 @@ func main() { } db, err = database.Open(conf, tagmapping) if err != nil { - log.Fatal(err) + die(err) } } @@ -139,15 +154,15 @@ func main() { progress.Reset() err = db.Init() if err != nil { - log.Fatal(err) + die(err) } diffCache := cache.NewDiffCache(*cachedir) if err = diffCache.Remove(); err != nil { - log.Fatal(err) + die(err) } if err = diffCache.Open(); err != nil { - log.Fatal(err) + die(err) } insertBuffer := writer.NewInsertBuffer() @@ -181,48 +196,48 @@ func main() { if db, ok := db.(database.Generalizer); ok { if err := db.Generalize(); err != nil { - log.Fatal(err) + die(err) } } else { - log.Fatal("database not generalizeable") + die("database not generalizeable") } if db, ok := db.(database.Finisher); ok { if err := db.Finish(); err != nil { - log.Fatal(err) + die(err) } } else { - log.Fatal("database not finishable") + die("database not finishable") } } if *deployProduction { if db, ok := db.(database.Deployer); ok { if err := db.Deploy(); err != nil { - log.Fatal(err) + die(err) } } else { - log.Fatal("database not deployable") + die("database not deployable") } } if *revertDeploy { if db, ok := db.(database.Deployer); ok { if err := db.RevertDeploy(); err != nil { - log.Fatal(err) + die(err) } } else { - log.Fatal("database not deployable") + die("database not deployable") } } if *removeBackup { if db, ok := db.(database.Deployer); ok { if err := db.RemoveBackup(); err != nil { - log.Fatal(err) + die(err) } } else { - log.Fatal("database not deployable") + die("database not deployable") } } progress.Stop() diff --git a/logging/logger.go b/logging/logger.go new file mode 100644 index 0000000..da8ae32 --- /dev/null +++ b/logging/logger.go @@ -0,0 +1,160 @@ +package logging + +import ( + "fmt" + "math/rand" + "sync" + "time" +) + +type Level int + +const ( + FATAL Level = iota + ERROR + WARNING + INFO + DEBUG +) + +type Record struct { + Level Level + Component string + Message string +} + +const ( + CLEARLINE = "\x1b[2K" +) + +func Debugf(msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{DEBUG, "", fmt.Sprintf(msg, args...)} +} + +func Infof(msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{INFO, "", fmt.Sprintf(msg, args...)} +} + +func Warnf(msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{WARNING, "", fmt.Sprintf(msg, args...)} +} + +func Errorf(msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{ERROR, "", fmt.Sprintf(msg, args...)} +} + +func Fatalf(msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{FATAL, "", fmt.Sprintf(msg, args...)} +} + +func Progress(msg string) { + defaultLogBroker.Progress <- msg +} + +type Logger struct { + Component string +} + +func (l *Logger) Printf(msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{INFO, l.Component, fmt.Sprintf(msg, args...)} +} + +func (l *Logger) Fatal(args ...interface{}) { + defaultLogBroker.Records <- Record{FATAL, l.Component, fmt.Sprint(args...)} +} + +func (l *Logger) Fatalf(msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{FATAL, l.Component, fmt.Sprintf(msg, args...)} +} + +func (l *Logger) Errorf(msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{ERROR, l.Component, fmt.Sprintf(msg, args...)} +} + +func (l *Logger) Printfl(level Level, msg string, args ...interface{}) { + defaultLogBroker.Records <- Record{level, l.Component, fmt.Sprintf(msg, args...)} +} + +func NewLogger(component string) *Logger { + return &Logger{component} +} + +type LogBroker struct { + Records chan Record + Progress chan string + quit chan bool + wg *sync.WaitGroup +} + +func (l *LogBroker) loop() { + l.wg.Add(1) + newline := true + lastProgress := "" + for { + select { + case record := <-l.Records: + if !newline { + fmt.Print(CLEARLINE) + } + l.printRecord(record) + newline = true + if lastProgress != "" { + l.printProgress(lastProgress) + newline = false + } + case progress := <-l.Progress: + l.printProgress(progress) + lastProgress = progress + newline = false + case <-l.quit: + break + } + } + l.wg.Done() +} + +func (l *LogBroker) printPrefix() { + fmt.Print("[", time.Now().Format(time.Stamp), "] ") +} +func (l *LogBroker) printComponent(component string) { + if component != "" { + fmt.Print("[", component, "] ") + } +} + +func (l *LogBroker) printRecord(record Record) { + l.printPrefix() + l.printComponent(record.Component) + fmt.Println(record.Message) +} +func (l *LogBroker) printProgress(progress string) { + l.printPrefix() + fmt.Print(progress) +} + +func Shutdown() { + defaultLogBroker.quit <- true + defaultLogBroker.wg.Wait() +} + +var defaultLogBroker LogBroker + +func init() { + defaultLogBroker = LogBroker{ + Records: make(chan Record, 8), + Progress: make(chan string), + quit: make(chan bool), + wg: &sync.WaitGroup{}, + } + go defaultLogBroker.loop() +} + +// func init() { +// go func() { +// log := NewLogger("Tourette") +// for { +// time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) +// log.Printfl(Level(rand.Intn(5)), "Bazinga") +// } +// }() +// } diff --git a/stats/stats.go b/stats/stats.go index db5f8e8..bf76f34 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -2,6 +2,7 @@ package stats import ( "fmt" + "goposm/logging" "time" ) @@ -96,7 +97,7 @@ func StatsReporter() *Statistics { s.messages = make(chan string) go func() { - tick := time.Tick(time.Second) + tick := time.Tick(500 * time.Millisecond) tock := time.Tick(time.Minute) for { select { @@ -133,27 +134,26 @@ func StatsReporter() *Statistics { } func (c *counter) PrintTick() { - fmt.Printf("\x1b[2K\r[%s] [%6s] C: %7d/s %7d/s (%10d) N: %7d/s %7d/s (%9d) W: %7d/s %7d/s (%8d) R: %6d/s %6d/s (%7d)", - c.start.Format(time.Stamp), - c.Duration(), - c.coords.Rps(1000), - c.coords.LastRps(1000), - c.coords.Value(), - c.nodes.Rps(100), - c.nodes.LastRps(100), - c.nodes.Value(), - c.ways.Rps(100), - c.ways.LastRps(100), - c.ways.Value(), - c.relations.Rps(10), - c.relations.LastRps(10), - c.relations.Value(), - ) + logging.Progress( + fmt.Sprintf("[%6s] C: %7d/s %7d/s (%10d) N: %7d/s %7d/s (%9d) W: %7d/s %7d/s (%8d) R: %6d/s %6d/s (%7d)\r", + c.Duration(), + c.coords.Rps(1000), + c.coords.LastRps(1000), + c.coords.Value(), + c.nodes.Rps(100), + c.nodes.LastRps(100), + c.nodes.Value(), + c.ways.Rps(100), + c.ways.LastRps(100), + c.ways.Value(), + c.relations.Rps(10), + c.relations.LastRps(10), + c.relations.Value(), + )) } func (c *counter) PrintStats() { - fmt.Printf("\x1b[2K\r[%s] [%6s] C: %7d/s (%10d) N: %7d/s (%9d) W: %7d/s (%8d) R: %6d/s (%7d)\n", - c.start.Format(time.Stamp), + logging.Infof("[%6s] C: %7d/s (%10d) N: %7d/s (%9d) W: %7d/s (%8d) R: %6d/s (%7d)", c.Duration(), c.coords.Rps(1000), c.coords.Value(),