parse pbf header block with timestamp; check required pbf features
parent
8fa52a13e7
commit
25cd7bfdc3
|
@ -5,20 +5,37 @@ import (
|
||||||
"code.google.com/p/goprotobuf/proto"
|
"code.google.com/p/goprotobuf/proto"
|
||||||
"compress/zlib"
|
"compress/zlib"
|
||||||
structs "encoding/binary"
|
structs "encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"goposm/parser/pbf/osmpbf"
|
"goposm/parser/pbf/osmpbf"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock {
|
type parserError struct {
|
||||||
|
message string
|
||||||
|
originalError error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *parserError) Error() string {
|
||||||
|
return fmt.Sprintf("%s: %v", e.message, e.originalError)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newParserError(message string, err error) *parserError {
|
||||||
|
return &parserError{message, err}
|
||||||
|
}
|
||||||
|
|
||||||
|
var supportedFeatured = map[string]bool{"OsmSchema-V0.6": true, "DenseNodes": true}
|
||||||
|
|
||||||
|
func readBlobData(pos Block) ([]byte, error) {
|
||||||
file, err := os.Open(pos.filename)
|
file, err := os.Open(pos.filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panic(err)
|
return nil, newParserError("file open", err)
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
|
|
||||||
var block = &osmpbf.PrimitiveBlock{}
|
|
||||||
var blob = &osmpbf.Blob{}
|
var blob = &osmpbf.Blob{}
|
||||||
|
|
||||||
blobData := make([]byte, pos.size)
|
blobData := make([]byte, pos.size)
|
||||||
|
@ -26,7 +43,7 @@ func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock {
|
||||||
io.ReadFull(file, blobData)
|
io.ReadFull(file, blobData)
|
||||||
err = proto.Unmarshal(blobData, blob)
|
err = proto.Unmarshal(blobData, blob)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panic("unmarshaling error blob: ", err)
|
return nil, newParserError("unmarshaling blob", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pbf contains (uncompressed) raw or zlibdata
|
// pbf contains (uncompressed) raw or zlibdata
|
||||||
|
@ -35,15 +52,23 @@ func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock {
|
||||||
buf := bytes.NewBuffer(blob.GetZlibData())
|
buf := bytes.NewBuffer(blob.GetZlibData())
|
||||||
r, err := zlib.NewReader(buf)
|
r, err := zlib.NewReader(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panic("zlib error: ", err)
|
return nil, newParserError("zlib error", err)
|
||||||
}
|
}
|
||||||
raw = make([]byte, blob.GetRawSize())
|
raw = make([]byte, blob.GetRawSize())
|
||||||
_, err = io.ReadFull(r, raw)
|
_, err = io.ReadFull(r, raw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panic("zlib read error: ", err)
|
return nil, newParserError("zlib read error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return raw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock {
|
||||||
|
raw, err := readBlobData(pos)
|
||||||
|
if err != nil {
|
||||||
|
log.Panic(err)
|
||||||
|
}
|
||||||
|
block := &osmpbf.PrimitiveBlock{}
|
||||||
err = proto.Unmarshal(raw, block)
|
err = proto.Unmarshal(raw, block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panic("unmarshaling error: ", err)
|
log.Panic("unmarshaling error: ", err)
|
||||||
|
@ -52,18 +77,52 @@ func readPrimitiveBlock(pos Block) *osmpbf.PrimitiveBlock {
|
||||||
return block
|
return block
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func readAndParseHeaderBlock(pos Block) (*pbfHeader, error) {
|
||||||
|
raw, err := readBlobData(pos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
header := &osmpbf.HeaderBlock{}
|
||||||
|
err = proto.Unmarshal(raw, header)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, feature := range header.RequiredFeatures {
|
||||||
|
if supportedFeatured[feature] != true {
|
||||||
|
return nil, errors.New("cannot parse file, feature " + feature + " not supported")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result := &pbfHeader{}
|
||||||
|
timestamp := header.GetOsmosisReplicationTimestamp()
|
||||||
|
result.Time = time.Unix(timestamp, 0 /* nanoseconds */)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
type pbf struct {
|
type pbf struct {
|
||||||
file *os.File
|
file *os.File
|
||||||
filename string
|
filename string
|
||||||
offset int64
|
offset int64
|
||||||
|
Header *pbfHeader
|
||||||
}
|
}
|
||||||
|
|
||||||
func open(filename string) (f *pbf, err error) {
|
type pbfHeader struct {
|
||||||
|
Time time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func Open(filename string) (f *pbf, err error) {
|
||||||
file, err := os.Open(filename)
|
file, err := os.Open(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
f = &pbf{filename: filename, file: file}
|
f = &pbf{filename: filename, file: file}
|
||||||
|
err = f.parseHeader()
|
||||||
|
if err != nil {
|
||||||
|
file.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,30 +130,39 @@ func (pbf *pbf) Close() error {
|
||||||
return pbf.file.Close()
|
return pbf.file.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pbf *pbf) NextDataPosition() (offset int64, size int32) {
|
func (pbf *pbf) parseHeader() error {
|
||||||
header := pbf.nextBlobHeader()
|
offset, size, header := pbf.nextBlock()
|
||||||
|
if header.GetType() != "OSMHeader" {
|
||||||
|
panic("invalid block type, expected OSMHeader, got " + header.GetType())
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
pbf.Header, err = readAndParseHeaderBlock(Block{pbf.filename, offset, size})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pbf *pbf) nextBlock() (offset int64, size int32, header *osmpbf.BlobHeader) {
|
||||||
|
header = pbf.nextBlobHeader()
|
||||||
size = header.GetDatasize()
|
size = header.GetDatasize()
|
||||||
offset = pbf.offset
|
offset = pbf.offset
|
||||||
|
|
||||||
pbf.offset += int64(size)
|
pbf.offset += int64(size)
|
||||||
pbf.file.Seek(pbf.offset, 0)
|
pbf.file.Seek(pbf.offset, 0)
|
||||||
|
return offset, size, header
|
||||||
if header.GetType() == "OSMHeader" {
|
|
||||||
return pbf.NextDataPosition()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pbf *pbf) BlockPositions() (positions chan Block) {
|
func (pbf *pbf) BlockPositions() (positions chan Block) {
|
||||||
positions = make(chan Block, 8)
|
positions = make(chan Block, 8)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
offset, size := pbf.NextDataPosition()
|
offset, size, header := pbf.nextBlock()
|
||||||
if size == 0 {
|
if size == 0 {
|
||||||
close(positions)
|
close(positions)
|
||||||
pbf.Close()
|
pbf.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if header.GetType() != "OSMData" {
|
||||||
|
panic("invalid block type, expected OSMData, got " + header.GetType())
|
||||||
|
}
|
||||||
positions <- Block{pbf.filename, offset, size}
|
positions <- Block{pbf.filename, offset, size}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -201,7 +201,7 @@ func newStringTable(source *osmpbf.StringTable) stringTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Blocks(filename string) chan Block {
|
func Blocks(filename string) chan Block {
|
||||||
pbf, err := open(filename)
|
pbf, err := Open(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package reader
|
||||||
import (
|
import (
|
||||||
"goposm/cache"
|
"goposm/cache"
|
||||||
"goposm/element"
|
"goposm/element"
|
||||||
|
"goposm/logging"
|
||||||
"goposm/mapping"
|
"goposm/mapping"
|
||||||
"goposm/parser/pbf"
|
"goposm/parser/pbf"
|
||||||
"goposm/stats"
|
"goposm/stats"
|
||||||
|
@ -13,6 +14,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var log = logging.NewLogger("reader")
|
||||||
|
|
||||||
var skipCoords, skipNodes, skipWays bool
|
var skipCoords, skipNodes, skipWays bool
|
||||||
var nParser, nWays, nRels, nNodes, nCoords int64
|
var nParser, nWays, nRels, nNodes, nCoords int64
|
||||||
|
|
||||||
|
@ -48,7 +51,16 @@ func ReadPbf(cache *cache.OSMCache, progress *stats.Statistics, tagmapping *mapp
|
||||||
ways := make(chan []element.Way, 4)
|
ways := make(chan []element.Way, 4)
|
||||||
relations := make(chan []element.Relation, 4)
|
relations := make(chan []element.Relation, 4)
|
||||||
|
|
||||||
blocks := pbf.Blocks(filename)
|
pbfFile, err := pbf.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pbfFile.Header.Time.Unix() != 0 {
|
||||||
|
log.Printf("reading %s with data till %v", filename, pbfFile.Header.Time.Local())
|
||||||
|
}
|
||||||
|
|
||||||
|
blocks := pbfFile.BlockPositions()
|
||||||
|
|
||||||
waitParser := sync.WaitGroup{}
|
waitParser := sync.WaitGroup{}
|
||||||
for i := 0; int64(i) < nParser; i++ {
|
for i := 0; int64(i) < nParser; i++ {
|
||||||
|
|
Loading…
Reference in New Issue