Merge branch 'master' into singletransaction
commit
4e921b723d
|
@ -91,17 +91,20 @@ type DeltaCoordsCache struct {
|
|||
capacity int64
|
||||
linearImport bool
|
||||
mu sync.Mutex
|
||||
bunchSize int64
|
||||
}
|
||||
|
||||
func NewDeltaCoordsCache(path string) (*DeltaCoordsCache, error) {
|
||||
coordsCache := DeltaCoordsCache{}
|
||||
coordsCache.options = &osmCacheOptions.Coords.CacheOptions
|
||||
err := coordsCache.open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
coordsCache.bunchSize = int64(osmCacheOptions.Coords.BunchSize)
|
||||
coordsCache.lruList = list.New()
|
||||
coordsCache.table = make(map[int64]*CoordsBunch)
|
||||
// mem req for cache approx. capacity*deltaCacheBunchSize*40
|
||||
// mem req for cache approx. capacity*bunchSize*40
|
||||
coordsCache.capacity = 1024
|
||||
return &coordsCache, nil
|
||||
}
|
||||
|
@ -123,7 +126,7 @@ func (self *DeltaCoordsCache) Close() {
|
|||
}
|
||||
|
||||
func (self *DeltaCoordsCache) GetCoord(id int64) (*element.Node, error) {
|
||||
bunchId := getBunchId(id)
|
||||
bunchId := self.getBunchId(id)
|
||||
bunch, err := self.getBunch(bunchId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -144,7 +147,7 @@ func (self *DeltaCoordsCache) FillWay(way *element.Way) error {
|
|||
lastBunchId = -1
|
||||
|
||||
for i, id := range way.Refs {
|
||||
bunchId = getBunchId(id)
|
||||
bunchId = self.getBunchId(id)
|
||||
// re-use bunches
|
||||
if bunchId != lastBunchId {
|
||||
if bunch != nil {
|
||||
|
@ -174,15 +177,15 @@ func (self *DeltaCoordsCache) FillWay(way *element.Way) error {
|
|||
// nodes need to be sorted by Id.
|
||||
func (self *DeltaCoordsCache) PutCoords(nodes []element.Node) error {
|
||||
var start, currentBunchId int64
|
||||
currentBunchId = getBunchId(nodes[0].Id)
|
||||
currentBunchId = self.getBunchId(nodes[0].Id)
|
||||
start = 0
|
||||
totalNodes := len(nodes)
|
||||
for i, node := range nodes {
|
||||
bunchId := getBunchId(node.Id)
|
||||
bunchId := self.getBunchId(node.Id)
|
||||
if bunchId != currentBunchId {
|
||||
if self.linearImport && int64(i) > deltaCacheBunchSize && int64(i) < int64(totalNodes)-deltaCacheBunchSize {
|
||||
if self.linearImport && int64(i) > self.bunchSize && int64(i) < int64(totalNodes)-self.bunchSize {
|
||||
// no need to handle concurrent updates to the same
|
||||
// bunch if we are not at the boundary of a deltaCacheBunchSize
|
||||
// bunch if we are not at the boundary of a self.bunchSize
|
||||
self.putCoordsPacked(currentBunchId, nodes[start:i])
|
||||
} else {
|
||||
bunch, err := self.getBunch(currentBunchId)
|
||||
|
@ -259,8 +262,8 @@ func (p *DeltaCoordsCache) getCoordsPacked(bunchId int64, nodes []element.Node)
|
|||
return nodes, nil
|
||||
}
|
||||
|
||||
func getBunchId(nodeId int64) int64 {
|
||||
return nodeId / deltaCacheBunchSize
|
||||
func (self *DeltaCoordsCache) getBunchId(nodeId int64) int64 {
|
||||
return nodeId / self.bunchSize
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -277,7 +280,7 @@ func (self *DeltaCoordsCache) getBunch(bunchId int64) (*CoordsBunch, error) {
|
|||
select {
|
||||
case nodes = <-freeNodes:
|
||||
default:
|
||||
nodes = make([]element.Node, 0, deltaCacheBunchSize)
|
||||
nodes = make([]element.Node, 0, self.bunchSize)
|
||||
}
|
||||
bunch = &CoordsBunch{id: bunchId, coords: nil, elem: elem}
|
||||
needsGet = true
|
||||
|
|
|
@ -108,8 +108,9 @@ type idRef struct {
|
|||
|
||||
const cacheSize = 4096
|
||||
|
||||
func NewRefIndex(path string) (*RefIndex, error) {
|
||||
func NewRefIndex(path string, opts *CacheOptions) (*RefIndex, error) {
|
||||
index := RefIndex{}
|
||||
index.options = opts
|
||||
err := index.open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -129,13 +130,13 @@ func NewRefIndex(path string) (*RefIndex, error) {
|
|||
}
|
||||
|
||||
func NewCoordsRefIndex(dir string) (*CoordsRefIndex, error) {
|
||||
cache, err := NewRefIndex(dir)
|
||||
cache, err := NewRefIndex(dir, &osmCacheOptions.CoordsIndex)
|
||||
refIdx, err := &CoordsRefIndex{*cache}, err
|
||||
return refIdx, err
|
||||
}
|
||||
|
||||
func NewWaysRefIndex(dir string) (*WaysRefIndex, error) {
|
||||
cache, err := NewRefIndex(dir)
|
||||
cache, err := NewRefIndex(dir, &osmCacheOptions.WaysIndex)
|
||||
return &WaysRefIndex{*cache}, err
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ type NodesCache struct {
|
|||
|
||||
func NewNodesCache(path string) (*NodesCache, error) {
|
||||
cache := NodesCache{}
|
||||
cache.options = &osmCacheOptions.Nodes
|
||||
err := cache.open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -6,33 +6,8 @@ import (
|
|||
"github.com/jmhodges/levigo"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
var levelDbWriteBufferSize, levelDbWriteBlockSize int64
|
||||
var deltaCacheBunchSize int64
|
||||
|
||||
func init() {
|
||||
levelDbWriteBufferSize, _ = strconv.ParseInt(
|
||||
os.Getenv("GOPOSM_LEVELDB_BUFFERSIZE"), 10, 32)
|
||||
levelDbWriteBlockSize, _ = strconv.ParseInt(
|
||||
os.Getenv("GOPOSM_LEVELDB_BLOCKSIZE"), 10, 32)
|
||||
|
||||
// bunchSize defines how many coordinates should be stored in a
|
||||
// single record. This is the maximum and a bunch will typically contain
|
||||
// less coordinates (e.g. when nodes are removed from OSM or when you
|
||||
// are working with an OSM extract).
|
||||
//
|
||||
// A higher number improves -read mode (writing the cache) but also
|
||||
// increases the overhead during -write mode (reading coords).
|
||||
deltaCacheBunchSize, _ = strconv.ParseInt(
|
||||
os.Getenv("GOPOSM_DELTACACHE_BUNCHSIZE"), 10, 32)
|
||||
|
||||
if deltaCacheBunchSize == 0 {
|
||||
deltaCacheBunchSize = 128
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
NotFound = errors.New("not found")
|
||||
)
|
||||
|
@ -149,25 +124,33 @@ func (c *OSMCache) Remove() error {
|
|||
}
|
||||
|
||||
type Cache struct {
|
||||
db *levigo.DB
|
||||
wo *levigo.WriteOptions
|
||||
ro *levigo.ReadOptions
|
||||
db *levigo.DB
|
||||
options *CacheOptions
|
||||
cache *levigo.Cache
|
||||
wo *levigo.WriteOptions
|
||||
ro *levigo.ReadOptions
|
||||
}
|
||||
|
||||
func (c *Cache) open(path string) error {
|
||||
opts := levigo.NewOptions()
|
||||
opts.SetCache(levigo.NewLRUCache(1024 * 1024 * 8))
|
||||
opts.SetCreateIfMissing(true)
|
||||
opts.SetMaxOpenFiles(64)
|
||||
// save a few bytes by allowing leveldb to use delta enconding
|
||||
// for up to n keys (instead of only 16)
|
||||
opts.SetBlockRestartInterval(128)
|
||||
if levelDbWriteBufferSize != 0 {
|
||||
opts.SetWriteBufferSize(int(levelDbWriteBufferSize))
|
||||
if c.options.CacheSizeM > 0 {
|
||||
c.cache = levigo.NewLRUCache(c.options.CacheSizeM * 1024 * 1024)
|
||||
opts.SetCache(c.cache)
|
||||
}
|
||||
if levelDbWriteBlockSize != 0 {
|
||||
opts.SetBlockSize(int(levelDbWriteBlockSize))
|
||||
if c.options.MaxOpenFiles > 0 {
|
||||
opts.SetMaxOpenFiles(c.options.MaxOpenFiles)
|
||||
}
|
||||
if c.options.BlockRestartInterval > 0 {
|
||||
opts.SetBlockRestartInterval(c.options.BlockRestartInterval)
|
||||
}
|
||||
if c.options.WriteBufferSizeM > 0 {
|
||||
opts.SetWriteBufferSize(c.options.WriteBufferSizeM * 1024 * 1024)
|
||||
}
|
||||
if c.options.BlockSizeK > 0 {
|
||||
opts.SetBlockSize(c.options.BlockSizeK * 1024 * 1024)
|
||||
}
|
||||
|
||||
db, err := levigo.Open(path, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -188,6 +171,9 @@ func idFromKeyBuf(buf []byte) int64 {
|
|||
return int64(bin.BigEndian.Uint64(buf))
|
||||
}
|
||||
|
||||
func (p *Cache) Close() {
|
||||
p.db.Close()
|
||||
func (c *Cache) Close() {
|
||||
c.db.Close()
|
||||
if c.cache != nil {
|
||||
c.cache.Close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ type RelationsCache struct {
|
|||
|
||||
func NewRelationsCache(path string) (*RelationsCache, error) {
|
||||
cache := RelationsCache{}
|
||||
cache.options = &osmCacheOptions.Relations
|
||||
err := cache.open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -12,6 +12,7 @@ type WaysCache struct {
|
|||
|
||||
func NewWaysCache(path string) (*WaysCache, error) {
|
||||
cache := WaysCache{}
|
||||
cache.options = &osmCacheOptions.Ways
|
||||
err := cache.open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -104,6 +105,8 @@ type InsertedWaysCache struct {
|
|||
|
||||
func NewInsertedWaysCache(path string) (*InsertedWaysCache, error) {
|
||||
cache := InsertedWaysCache{}
|
||||
cache.options = &osmCacheOptions.InsertedWays
|
||||
|
||||
err := cache.open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
12
goposm.go
12
goposm.go
|
@ -15,24 +15,12 @@ import (
|
|||
"os"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var dbImportBatchSize int64
|
||||
|
||||
var log = logging.NewLogger("")
|
||||
|
||||
func init() {
|
||||
dbImportBatchSize, _ = strconv.ParseInt(
|
||||
os.Getenv("GOPOSM_DBIMPORT_BATCHSIZE"), 10, 32)
|
||||
|
||||
if dbImportBatchSize == 0 {
|
||||
dbImportBatchSize = 4096
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
cpuprofile = flag.String("cpuprofile", "", "filename of cpu profile output")
|
||||
httpprofile = flag.String("httpprofile", "", "bind address for profile server")
|
||||
|
|
Loading…
Reference in New Issue