diff --git a/parser/pbf/lowlevel.go b/parser/pbf/lowlevel.go index a569e1e..436377a 100644 --- a/parser/pbf/lowlevel.go +++ b/parser/pbf/lowlevel.go @@ -5,20 +5,37 @@ import ( "code.google.com/p/goprotobuf/proto" "compress/zlib" structs "encoding/binary" + "errors" + "fmt" "goposm/parser/pbf/osmpbf" "io" "log" "os" + "time" ) -func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock { +type parserError struct { + message string + originalError error +} + +func (e *parserError) Error() string { + return fmt.Sprintf("%s: %v", e.message, e.originalError) +} + +func newParserError(message string, err error) *parserError { + return &parserError{message, err} +} + +var supportedFeatured = map[string]bool{"OsmSchema-V0.6": true, "DenseNodes": true} + +func readBlobData(pos Block) ([]byte, error) { file, err := os.Open(pos.filename) if err != nil { - log.Panic(err) + return nil, newParserError("file open", err) } defer file.Close() - var block = &osmpbf.PrimitiveBlock{} var blob = &osmpbf.Blob{} blobData := make([]byte, pos.size) @@ -26,7 +43,7 @@ func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock { io.ReadFull(file, blobData) err = proto.Unmarshal(blobData, blob) if err != nil { - log.Panic("unmarshaling error blob: ", err) + return nil, newParserError("unmarshaling blob", err) } // pbf contains (uncompressed) raw or zlibdata @@ -35,15 +52,23 @@ func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock { buf := bytes.NewBuffer(blob.GetZlibData()) r, err := zlib.NewReader(buf) if err != nil { - log.Panic("zlib error: ", err) + return nil, newParserError("zlib error", err) } raw = make([]byte, blob.GetRawSize()) _, err = io.ReadFull(r, raw) if err != nil { - log.Panic("zlib read error: ", err) + return nil, newParserError("zlib read error", err) } } + return raw, nil +} +func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock { + raw, err := readBlobData(pos) + if err != nil { + log.Panic(err) + } + block := &osmpbf.PrimitiveBlock{} err = proto.Unmarshal(raw, block) if err != nil { log.Panic("unmarshaling error: ", err) @@ -52,18 +77,52 @@ func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock { return block } +func readAndParseHeaderBlock(pos Block) (*pbfHeader, error) { + raw, err := readBlobData(pos) + if err != nil { + return nil, err + } + + header := &osmpbf.HeaderBlock{} + err = proto.Unmarshal(raw, header) + if err != nil { + return nil, err + } + + for _, feature := range header.RequiredFeatures { + if supportedFeatured[feature] != true { + return nil, errors.New("cannot parse file, feature " + feature + " not supported") + } + } + + result := &pbfHeader{} + timestamp := header.GetOsmosisReplicationTimestamp() + result.Time = time.Unix(timestamp, 0 /* nanoseconds */) + return result, nil +} + type pbf struct { file *os.File filename string offset int64 + Header *pbfHeader } -func open(filename string) (f *pbf, err error) { +type pbfHeader struct { + Time time.Time +} + +func Open(filename string) (f *pbf, err error) { file, err := os.Open(filename) if err != nil { return nil, err } f = &pbf{filename: filename, file: file} + err = f.parseHeader() + if err != nil { + file.Close() + return nil, err + } return f, nil } @@ -71,30 +130,39 @@ func (pbf *pbf) Close() error { return pbf.file.Close() } -func (pbf *pbf) NextDataPosition() (offset int64, size int32) { - header := pbf.nextBlobHeader() +func (pbf *pbf) parseHeader() error { + offset, size, header := pbf.nextBlock() + if header.GetType() != "OSMHeader" { + panic("invalid block type, expected OSMHeader, got " + header.GetType()) + } + var err error + pbf.Header, err = readAndParseHeaderBlock(Block{pbf.filename, offset, size}) + return err +} + +func (pbf *pbf) nextBlock() (offset int64, size int32, header *osmpbf.BlobHeader) { + header = pbf.nextBlobHeader() size = header.GetDatasize() offset = pbf.offset pbf.offset += int64(size) pbf.file.Seek(pbf.offset, 0) - - if header.GetType() == "OSMHeader" { - return pbf.NextDataPosition() - } - return + return offset, size, header } func (pbf *pbf) BlockPositions() (positions chan Block) { positions = make(chan Block, 8) go func() { for { - offset, size := pbf.NextDataPosition() + offset, size, header := pbf.nextBlock() if size == 0 { close(positions) pbf.Close() return } + if header.GetType() != "OSMData" { + panic("invalid block type, expected OSMData, got " + header.GetType()) + } positions <- Block{pbf.filename, offset, size} } }() diff --git a/parser/pbf/pbf.go b/parser/pbf/pbf.go index 230a4d2..3313853 100644 --- a/parser/pbf/pbf.go +++ b/parser/pbf/pbf.go @@ -201,7 +201,7 @@ func newStringTable(source *osmpbf.StringTable) stringTable { } func Blocks(filename string) chan Block { - pbf, err := open(filename) + pbf, err := Open(filename) if err != nil { log.Fatal(err) } diff --git a/reader/reader.go b/reader/reader.go index 533839a..0491a70 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -3,6 +3,7 @@ package reader import ( "goposm/cache" "goposm/element" + "goposm/logging" "goposm/mapping" "goposm/parser/pbf" "goposm/stats" @@ -13,6 +14,8 @@ import ( "sync" ) +var log = logging.NewLogger("reader") + var skipCoords, skipNodes, skipWays bool var nParser, nWays, nRels, nNodes, nCoords int64 @@ -48,7 +51,16 @@ func ReadPbf(cache *cache.OSMCache, progress *stats.Statistics, tagmapping *mapp ways := make(chan []element.Way, 4) relations := make(chan []element.Relation, 4) - blocks := pbf.Blocks(filename) + pbfFile, err := pbf.Open(filename) + if err != nil { + log.Fatal(err) + } + + if pbfFile.Header.Time.Unix() != 0 { + log.Printf("reading %s with data till %v", filename, pbfFile.Header.Time.Local()) + } + + blocks := pbfFile.BlockPositions() waitParser := sync.WaitGroup{} for i := 0; int64(i) < nParser; i++ {