diff --git a/cache/delta.go b/cache/delta.go index d3ffc92..ae3e3c6 100644 --- a/cache/delta.go +++ b/cache/delta.go @@ -91,17 +91,20 @@ type DeltaCoordsCache struct { capacity int64 linearImport bool mu sync.Mutex + bunchSize int64 } func NewDeltaCoordsCache(path string) (*DeltaCoordsCache, error) { coordsCache := DeltaCoordsCache{} + coordsCache.options = &osmCacheOptions.Coords.CacheOptions err := coordsCache.open(path) if err != nil { return nil, err } + coordsCache.bunchSize = int64(osmCacheOptions.Coords.BunchSize) coordsCache.lruList = list.New() coordsCache.table = make(map[int64]*CoordsBunch) - // mem req for cache approx. capacity*deltaCacheBunchSize*40 + // mem req for cache approx. capacity*bunchSize*40 coordsCache.capacity = 1024 return &coordsCache, nil } @@ -123,7 +126,7 @@ func (self *DeltaCoordsCache) Close() { } func (self *DeltaCoordsCache) GetCoord(id int64) (*element.Node, error) { - bunchId := getBunchId(id) + bunchId := self.getBunchId(id) bunch, err := self.getBunch(bunchId) if err != nil { return nil, err @@ -144,7 +147,7 @@ func (self *DeltaCoordsCache) FillWay(way *element.Way) error { lastBunchId = -1 for i, id := range way.Refs { - bunchId = getBunchId(id) + bunchId = self.getBunchId(id) // re-use bunches if bunchId != lastBunchId { if bunch != nil { @@ -174,15 +177,15 @@ func (self *DeltaCoordsCache) FillWay(way *element.Way) error { // nodes need to be sorted by Id. func (self *DeltaCoordsCache) PutCoords(nodes []element.Node) error { var start, currentBunchId int64 - currentBunchId = getBunchId(nodes[0].Id) + currentBunchId = self.getBunchId(nodes[0].Id) start = 0 totalNodes := len(nodes) for i, node := range nodes { - bunchId := getBunchId(node.Id) + bunchId := self.getBunchId(node.Id) if bunchId != currentBunchId { - if self.linearImport && int64(i) > deltaCacheBunchSize && int64(i) < int64(totalNodes)-deltaCacheBunchSize { + if self.linearImport && int64(i) > self.bunchSize && int64(i) < int64(totalNodes)-self.bunchSize { // no need to handle concurrent updates to the same - // bunch if we are not at the boundary of a deltaCacheBunchSize + // bunch if we are not at the boundary of a self.bunchSize self.putCoordsPacked(currentBunchId, nodes[start:i]) } else { bunch, err := self.getBunch(currentBunchId) @@ -259,8 +262,8 @@ func (p *DeltaCoordsCache) getCoordsPacked(bunchId int64, nodes []element.Node) return nodes, nil } -func getBunchId(nodeId int64) int64 { - return nodeId / deltaCacheBunchSize +func (self *DeltaCoordsCache) getBunchId(nodeId int64) int64 { + return nodeId / self.bunchSize } var ( @@ -277,7 +280,7 @@ func (self *DeltaCoordsCache) getBunch(bunchId int64) (*CoordsBunch, error) { select { case nodes = <-freeNodes: default: - nodes = make([]element.Node, 0, deltaCacheBunchSize) + nodes = make([]element.Node, 0, self.bunchSize) } bunch = &CoordsBunch{id: bunchId, coords: nil, elem: elem} needsGet = true diff --git a/cache/diff.go b/cache/diff.go index f987ced..82dd524 100644 --- a/cache/diff.go +++ b/cache/diff.go @@ -108,8 +108,9 @@ type idRef struct { const cacheSize = 4096 -func NewRefIndex(path string) (*RefIndex, error) { +func NewRefIndex(path string, opts *CacheOptions) (*RefIndex, error) { index := RefIndex{} + index.options = opts err := index.open(path) if err != nil { return nil, err @@ -129,13 +130,13 @@ func NewRefIndex(path string) (*RefIndex, error) { } func NewCoordsRefIndex(dir string) (*CoordsRefIndex, error) { - cache, err := NewRefIndex(dir) + cache, err := NewRefIndex(dir, &osmCacheOptions.CoordsIndex) refIdx, err := &CoordsRefIndex{*cache}, err return refIdx, err } func NewWaysRefIndex(dir string) (*WaysRefIndex, error) { - cache, err := NewRefIndex(dir) + cache, err := NewRefIndex(dir, &osmCacheOptions.WaysIndex) return &WaysRefIndex{*cache}, err } diff --git a/cache/nodes.go b/cache/nodes.go index 4787bd5..eb30272 100644 --- a/cache/nodes.go +++ b/cache/nodes.go @@ -12,6 +12,7 @@ type NodesCache struct { func NewNodesCache(path string) (*NodesCache, error) { cache := NodesCache{} + cache.options = &osmCacheOptions.Nodes err := cache.open(path) if err != nil { return nil, err diff --git a/cache/osm.go b/cache/osm.go index 03d5833..620bafb 100644 --- a/cache/osm.go +++ b/cache/osm.go @@ -6,33 +6,8 @@ import ( "github.com/jmhodges/levigo" "os" "path/filepath" - "strconv" ) -var levelDbWriteBufferSize, levelDbWriteBlockSize int64 -var deltaCacheBunchSize int64 - -func init() { - levelDbWriteBufferSize, _ = strconv.ParseInt( - os.Getenv("GOPOSM_LEVELDB_BUFFERSIZE"), 10, 32) - levelDbWriteBlockSize, _ = strconv.ParseInt( - os.Getenv("GOPOSM_LEVELDB_BLOCKSIZE"), 10, 32) - - // bunchSize defines how many coordinates should be stored in a - // single record. This is the maximum and a bunch will typically contain - // less coordinates (e.g. when nodes are removed from OSM or when you - // are working with an OSM extract). - // - // A higher number improves -read mode (writing the cache) but also - // increases the overhead during -write mode (reading coords). - deltaCacheBunchSize, _ = strconv.ParseInt( - os.Getenv("GOPOSM_DELTACACHE_BUNCHSIZE"), 10, 32) - - if deltaCacheBunchSize == 0 { - deltaCacheBunchSize = 128 - } -} - var ( NotFound = errors.New("not found") ) @@ -149,25 +124,33 @@ func (c *OSMCache) Remove() error { } type Cache struct { - db *levigo.DB - wo *levigo.WriteOptions - ro *levigo.ReadOptions + db *levigo.DB + options *CacheOptions + cache *levigo.Cache + wo *levigo.WriteOptions + ro *levigo.ReadOptions } func (c *Cache) open(path string) error { opts := levigo.NewOptions() - opts.SetCache(levigo.NewLRUCache(1024 * 1024 * 8)) opts.SetCreateIfMissing(true) - opts.SetMaxOpenFiles(64) - // save a few bytes by allowing leveldb to use delta enconding - // for up to n keys (instead of only 16) - opts.SetBlockRestartInterval(128) - if levelDbWriteBufferSize != 0 { - opts.SetWriteBufferSize(int(levelDbWriteBufferSize)) + if c.options.CacheSizeM > 0 { + c.cache = levigo.NewLRUCache(c.options.CacheSizeM * 1024 * 1024) + opts.SetCache(c.cache) } - if levelDbWriteBlockSize != 0 { - opts.SetBlockSize(int(levelDbWriteBlockSize)) + if c.options.MaxOpenFiles > 0 { + opts.SetMaxOpenFiles(c.options.MaxOpenFiles) } + if c.options.BlockRestartInterval > 0 { + opts.SetBlockRestartInterval(c.options.BlockRestartInterval) + } + if c.options.WriteBufferSizeM > 0 { + opts.SetWriteBufferSize(c.options.WriteBufferSizeM * 1024 * 1024) + } + if c.options.BlockSizeK > 0 { + opts.SetBlockSize(c.options.BlockSizeK * 1024 * 1024) + } + db, err := levigo.Open(path, opts) if err != nil { return err @@ -188,6 +171,9 @@ func idFromKeyBuf(buf []byte) int64 { return int64(bin.BigEndian.Uint64(buf)) } -func (p *Cache) Close() { - p.db.Close() +func (c *Cache) Close() { + c.db.Close() + if c.cache != nil { + c.cache.Close() + } } diff --git a/cache/relations.go b/cache/relations.go index d7192fe..d066b84 100644 --- a/cache/relations.go +++ b/cache/relations.go @@ -12,6 +12,7 @@ type RelationsCache struct { func NewRelationsCache(path string) (*RelationsCache, error) { cache := RelationsCache{} + cache.options = &osmCacheOptions.Relations err := cache.open(path) if err != nil { return nil, err diff --git a/cache/ways.go b/cache/ways.go index cfdd6f1..b5a4bf8 100644 --- a/cache/ways.go +++ b/cache/ways.go @@ -12,6 +12,7 @@ type WaysCache struct { func NewWaysCache(path string) (*WaysCache, error) { cache := WaysCache{} + cache.options = &osmCacheOptions.Ways err := cache.open(path) if err != nil { return nil, err @@ -104,6 +105,8 @@ type InsertedWaysCache struct { func NewInsertedWaysCache(path string) (*InsertedWaysCache, error) { cache := InsertedWaysCache{} + cache.options = &osmCacheOptions.InsertedWays + err := cache.open(path) if err != nil { return nil, err diff --git a/goposm.go b/goposm.go index 66aa5f1..86f27dc 100644 --- a/goposm.go +++ b/goposm.go @@ -15,24 +15,12 @@ import ( "os" "runtime" "runtime/pprof" - "strconv" "strings" "time" ) -var dbImportBatchSize int64 - var log = logging.NewLogger("") -func init() { - dbImportBatchSize, _ = strconv.ParseInt( - os.Getenv("GOPOSM_DBIMPORT_BATCHSIZE"), 10, 32) - - if dbImportBatchSize == 0 { - dbImportBatchSize = 4096 - } -} - var ( cpuprofile = flag.String("cpuprofile", "", "filename of cpu profile output") httpprofile = flag.String("httpprofile", "", "bind address for profile server")