imposm3/cache/db.go

560 lines
11 KiB
Go
Raw Normal View History

2013-02-12 22:45:49 +04:00
package cache
import (
"bytes"
2013-02-12 22:45:49 +04:00
bin "encoding/binary"
"errors"
2013-02-12 22:45:49 +04:00
"github.com/jmhodges/levigo"
2013-05-17 17:10:59 +04:00
"goposm/cache/binary"
2013-02-12 22:45:49 +04:00
"goposm/element"
"os"
2013-05-04 18:27:05 +04:00
"path/filepath"
2013-05-07 13:42:27 +04:00
"strconv"
2013-02-12 22:45:49 +04:00
)
2013-05-07 13:42:27 +04:00
var levelDbWriteBufferSize, levelDbWriteBlockSize int64
2013-05-13 12:40:18 +04:00
var deltaCacheBunchSize int64
2013-05-07 13:42:27 +04:00
func init() {
levelDbWriteBufferSize, _ = strconv.ParseInt(
os.Getenv("GOPOSM_LEVELDB_BUFFERSIZE"), 10, 32)
levelDbWriteBlockSize, _ = strconv.ParseInt(
os.Getenv("GOPOSM_LEVELDB_BLOCKSIZE"), 10, 32)
2013-05-13 12:40:18 +04:00
// bunchSize defines how many coordinates should be stored in a
// single record. This is the maximum and a bunch will typically contain
// less coordinates (e.g. when nodes are removes).
//
// A higher number improves -read mode (writing the cache) but also
// increases the overhead during -write mode (reading coords).
deltaCacheBunchSize, _ = strconv.ParseInt(
os.Getenv("GOPOSM_DELTACACHE_BUNCHSIZE"), 10, 32)
2013-05-13 12:40:18 +04:00
if deltaCacheBunchSize == 0 {
deltaCacheBunchSize = 128
}
2013-05-07 13:42:27 +04:00
}
var (
NotFound = errors.New("not found")
)
2013-05-04 18:27:05 +04:00
type OSMCache struct {
Dir string
Coords *DeltaCoordsCache
Ways *WaysCache
Nodes *NodesCache
Relations *RelationsCache
InsertedWays *InsertedWaysCache
opened bool
2013-05-04 18:27:05 +04:00
}
func (c *OSMCache) Close() {
if c.Coords != nil {
c.Coords.close()
c.Coords = nil
}
if c.Nodes != nil {
c.Nodes.close()
c.Nodes = nil
}
if c.Ways != nil {
c.Ways.close()
c.Ways = nil
}
if c.Relations != nil {
c.Relations.close()
c.Relations = nil
}
}
func NewOSMCache(dir string) *OSMCache {
2013-05-04 18:27:05 +04:00
cache := &OSMCache{Dir: dir}
return cache
}
func (c *OSMCache) Open() error {
2013-05-17 12:28:32 +04:00
err := os.MkdirAll(c.Dir, 0755)
if err != nil {
return err
}
c.Coords, err = NewDeltaCoordsCache(filepath.Join(c.Dir, "coords"))
2013-05-04 18:27:05 +04:00
if err != nil {
return err
2013-05-04 18:27:05 +04:00
}
c.Nodes, err = NewNodesCache(filepath.Join(c.Dir, "nodes"))
2013-05-04 18:27:05 +04:00
if err != nil {
c.Close()
return err
2013-05-04 18:27:05 +04:00
}
c.Ways, err = NewWaysCache(filepath.Join(c.Dir, "ways"))
2013-05-04 18:27:05 +04:00
if err != nil {
c.Close()
return err
2013-05-04 18:27:05 +04:00
}
c.Relations, err = NewRelationsCache(filepath.Join(c.Dir, "relations"))
2013-05-04 18:27:05 +04:00
if err != nil {
c.Close()
return err
}
c.InsertedWays, err = NewInsertedWaysCache(filepath.Join(c.Dir, "inserted_ways"))
if err != nil {
c.Close()
return err
}
c.opened = true
return nil
}
func (c *OSMCache) Exists() bool {
if c.opened {
return true
}
if _, err := os.Stat(filepath.Join(c.Dir, "coords")); !os.IsNotExist(err) {
return true
}
if _, err := os.Stat(filepath.Join(c.Dir, "nodes")); !os.IsNotExist(err) {
return true
}
if _, err := os.Stat(filepath.Join(c.Dir, "ways")); !os.IsNotExist(err) {
return true
}
if _, err := os.Stat(filepath.Join(c.Dir, "relations")); !os.IsNotExist(err) {
return true
}
if _, err := os.Stat(filepath.Join(c.Dir, "inserted_ways")); !os.IsNotExist(err) {
return true
}
return false
}
func (c *OSMCache) Remove() error {
if c.opened {
c.Close()
}
if err := os.RemoveAll(filepath.Join(c.Dir, "coords")); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(c.Dir, "nodes")); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(c.Dir, "ways")); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(c.Dir, "relations")); err != nil {
return err
2013-05-04 18:27:05 +04:00
}
if err := os.RemoveAll(filepath.Join(c.Dir, "inserted_ways")); err != nil {
return err
}
return nil
2013-05-04 18:27:05 +04:00
}
2013-02-12 22:45:49 +04:00
type Cache struct {
db *levigo.DB
wo *levigo.WriteOptions
ro *levigo.ReadOptions
}
2013-04-16 23:14:19 +04:00
func (c *Cache) open(path string) error {
2013-02-12 22:45:49 +04:00
opts := levigo.NewOptions()
2013-05-15 10:15:33 +04:00
opts.SetCache(levigo.NewLRUCache(1024 * 1024 * 8))
2013-02-12 22:45:49 +04:00
opts.SetCreateIfMissing(true)
2013-05-13 17:28:39 +04:00
opts.SetMaxOpenFiles(64)
2013-05-08 11:10:56 +04:00
// save a few bytes by allowing leveldb to use delta enconding
// for up to n keys (instead of only 16)
opts.SetBlockRestartInterval(128)
2013-05-07 13:42:27 +04:00
if levelDbWriteBufferSize != 0 {
opts.SetWriteBufferSize(int(levelDbWriteBufferSize))
}
if levelDbWriteBlockSize != 0 {
opts.SetBlockSize(int(levelDbWriteBlockSize))
}
2013-02-12 22:45:49 +04:00
db, err := levigo.Open(path, opts)
if err != nil {
2013-04-16 23:14:19 +04:00
return err
2013-02-12 22:45:49 +04:00
}
2013-04-16 23:14:19 +04:00
c.db = db
c.wo = levigo.NewWriteOptions()
c.ro = levigo.NewReadOptions()
return nil
2013-02-12 22:45:49 +04:00
}
2013-05-04 18:27:05 +04:00
func (c *Cache) close() {
c.db.Close()
}
2013-04-15 23:54:48 +04:00
type NodesCache struct {
Cache
}
func idToKeyBuf(id int64) []byte {
var b bytes.Buffer
bin.Write(&b, bin.BigEndian, &id)
return b.Bytes()
}
2013-04-16 23:14:19 +04:00
func NewNodesCache(path string) (*NodesCache, error) {
cache := NodesCache{}
err := cache.open(path)
if err != nil {
return nil, err
}
return &cache, err
2013-04-15 23:54:48 +04:00
}
type CoordsCache struct {
Cache
}
2013-04-16 23:14:19 +04:00
func NewCoordsCache(path string) (*CoordsCache, error) {
cache := CoordsCache{}
err := cache.open(path)
if err != nil {
return nil, err
}
return &cache, err
2013-04-15 23:54:48 +04:00
}
type WaysCache struct {
Cache
2013-04-20 18:50:23 +04:00
toWrite chan []element.Way
2013-04-15 23:54:48 +04:00
}
2013-04-16 23:14:19 +04:00
func NewWaysCache(path string) (*WaysCache, error) {
cache := WaysCache{}
2013-04-20 18:50:23 +04:00
cache.toWrite = make(chan []element.Way)
go cache.wayWriter()
2013-04-16 23:14:19 +04:00
err := cache.open(path)
if err != nil {
return nil, err
}
return &cache, err
2013-04-15 23:54:48 +04:00
}
type InsertedWaysCache struct {
Cache
}
func NewInsertedWaysCache(path string) (*InsertedWaysCache, error) {
cache := InsertedWaysCache{}
err := cache.open(path)
if err != nil {
return nil, err
}
return &cache, err
}
2013-04-15 23:54:48 +04:00
type RelationsCache struct {
Cache
}
2013-04-16 23:14:19 +04:00
func NewRelationsCache(path string) (*RelationsCache, error) {
cache := RelationsCache{}
err := cache.open(path)
if err != nil {
return nil, err
}
return &cache, err
2013-04-15 23:54:48 +04:00
}
2013-04-16 23:14:19 +04:00
func (p *CoordsCache) PutCoord(node *element.Node) error {
keyBuf := idToKeyBuf(node.Id)
data, err := binary.MarshalCoord(node)
if err != nil {
return err
}
2013-04-16 23:14:19 +04:00
return p.db.Put(p.wo, keyBuf, data)
}
2013-04-16 23:14:19 +04:00
func (p *CoordsCache) PutCoords(nodes []element.Node) error {
batch := levigo.NewWriteBatch()
defer batch.Close()
for _, node := range nodes {
keyBuf := idToKeyBuf(node.Id)
data, err := binary.MarshalCoord(&node)
if err != nil {
return err
}
batch.Put(keyBuf, data)
}
2013-04-16 23:14:19 +04:00
return p.db.Write(p.wo, batch)
}
2013-04-16 23:14:19 +04:00
func (p *CoordsCache) GetCoord(id int64) (*element.Node, error) {
keyBuf := idToKeyBuf(id)
data, err := p.db.Get(p.ro, keyBuf)
if err != nil {
2013-04-16 23:14:19 +04:00
return nil, err
}
2013-04-08 23:45:13 +04:00
if data == nil {
return nil, NotFound
2013-04-08 23:45:13 +04:00
}
node, err := binary.UnmarshalCoord(id, data)
if err != nil {
return nil, err
}
2013-04-16 23:14:19 +04:00
return node, nil
}
2013-04-16 23:14:19 +04:00
func (p *NodesCache) PutNode(node *element.Node) error {
if node.Tags == nil {
return nil
}
keyBuf := idToKeyBuf(node.Id)
2013-02-12 22:45:49 +04:00
data, err := binary.MarshalNode(node)
if err != nil {
return err
2013-02-12 22:45:49 +04:00
}
2013-04-16 23:14:19 +04:00
return p.db.Put(p.wo, keyBuf, data)
2013-02-12 22:45:49 +04:00
}
func (p *NodesCache) PutNodes(nodes []element.Node) (int, error) {
batch := levigo.NewWriteBatch()
defer batch.Close()
var n int
for _, node := range nodes {
2013-05-07 12:13:09 +04:00
if len(node.Tags) == 0 {
continue
}
keyBuf := idToKeyBuf(node.Id)
data, err := binary.MarshalNode(&node)
if err != nil {
return 0, err
}
batch.Put(keyBuf, data)
n += 1
}
return n, p.db.Write(p.wo, batch)
}
2013-04-16 23:14:19 +04:00
func (p *NodesCache) GetNode(id int64) (*element.Node, error) {
keyBuf := idToKeyBuf(id)
2013-02-12 22:45:49 +04:00
data, err := p.db.Get(p.ro, keyBuf)
if err != nil {
2013-04-16 23:14:19 +04:00
return nil, err
2013-02-12 22:45:49 +04:00
}
2013-04-08 23:45:13 +04:00
if data == nil {
return nil, NotFound
2013-04-08 23:45:13 +04:00
}
2013-02-12 22:45:49 +04:00
node, err := binary.UnmarshalNode(data)
if err != nil {
return nil, err
2013-02-12 22:45:49 +04:00
}
2013-04-16 23:14:19 +04:00
return node, nil
2013-02-12 22:45:49 +04:00
}
2013-05-15 15:21:31 +04:00
func (p *NodesCache) Iter() chan *element.Node {
2013-05-16 14:17:21 +04:00
node := make(chan *element.Node)
go func() {
ro := levigo.NewReadOptions()
ro.SetFillCache(false)
it := p.db.NewIterator(ro)
defer it.Close()
it.SeekToFirst()
for ; it.Valid(); it.Next() {
nodes, err := binary.UnmarshalNode(it.Value())
if err != nil {
panic(err)
}
node <- nodes
}
close(node)
}()
return node
2013-05-15 15:21:31 +04:00
}
2013-04-16 23:14:19 +04:00
func (p *WaysCache) PutWay(way *element.Way) error {
keyBuf := idToKeyBuf(way.Id)
data, err := binary.MarshalWay(way)
if err != nil {
return err
}
2013-04-16 23:14:19 +04:00
return p.db.Put(p.wo, keyBuf, data)
}
2013-04-16 23:14:19 +04:00
func (p *WaysCache) PutWays(ways []element.Way) error {
batch := levigo.NewWriteBatch()
defer batch.Close()
for _, way := range ways {
keyBuf := idToKeyBuf(way.Id)
data, err := binary.MarshalWay(&way)
if err != nil {
return err
}
batch.Put(keyBuf, data)
}
2013-04-16 23:14:19 +04:00
return p.db.Write(p.wo, batch)
}
2013-04-20 18:50:23 +04:00
func (p *WaysCache) _PutWays(ways []element.Way) {
p.toWrite <- ways
}
func (p *WaysCache) wayWriter() {
for ways := range p.toWrite {
batch := levigo.NewWriteBatch()
defer batch.Close()
for _, way := range ways {
keyBuf := idToKeyBuf(way.Id)
2013-04-20 18:50:23 +04:00
data, err := binary.MarshalWay(&way)
if err != nil {
panic(err)
}
batch.Put(keyBuf, data)
}
_ = p.db.Write(p.wo, batch)
}
}
2013-04-16 23:14:19 +04:00
func (p *WaysCache) GetWay(id int64) (*element.Way, error) {
keyBuf := idToKeyBuf(id)
data, err := p.db.Get(p.ro, keyBuf)
if err != nil {
2013-04-16 23:14:19 +04:00
return nil, err
}
2013-04-08 23:45:13 +04:00
if data == nil {
return nil, NotFound
2013-04-08 23:45:13 +04:00
}
way, err := binary.UnmarshalWay(data)
if err != nil {
return nil, err
}
2013-04-16 23:14:19 +04:00
return way, nil
}
func (p *WaysCache) Iter() chan *element.Way {
way := make(chan *element.Way)
go func() {
ro := levigo.NewReadOptions()
ro.SetFillCache(false)
it := p.db.NewIterator(ro)
defer it.Close()
it.SeekToFirst()
2013-05-13 12:55:08 +04:00
for ; it.Valid(); it.Next() {
ways, err := binary.UnmarshalWay(it.Value())
if err != nil {
panic(err)
}
way <- ways
}
close(way)
}()
return way
}
func (self *WaysCache) FillMembers(members []element.Member) error {
2013-05-16 14:17:21 +04:00
if members == nil || len(members) == 0 {
return nil
2013-05-16 14:17:21 +04:00
}
for i, member := range members {
if member.Type != element.WAY {
continue
}
way, err := self.GetWay(member.Id)
if err != nil {
return err
2013-05-16 14:17:21 +04:00
}
members[i].Way = way
}
return nil
2013-05-16 14:17:21 +04:00
}
2013-04-16 23:14:19 +04:00
func (p *RelationsCache) PutRelation(relation *element.Relation) error {
keyBuf := idToKeyBuf(relation.Id)
2013-04-08 23:45:13 +04:00
data, err := binary.MarshalRelation(relation)
if err != nil {
return err
2013-04-08 23:45:13 +04:00
}
2013-04-16 23:14:19 +04:00
return p.db.Put(p.wo, keyBuf, data)
2013-04-08 23:45:13 +04:00
}
2013-04-20 18:50:23 +04:00
func (p *RelationsCache) PutRelations(rels []element.Relation) error {
batch := levigo.NewWriteBatch()
defer batch.Close()
for _, rel := range rels {
2013-05-17 13:42:19 +04:00
if len(rel.Tags) == 0 {
continue
}
keyBuf := idToKeyBuf(rel.Id)
2013-04-20 18:50:23 +04:00
data, err := binary.MarshalRelation(&rel)
if err != nil {
return err
2013-04-20 18:50:23 +04:00
}
batch.Put(keyBuf, data)
}
return p.db.Write(p.wo, batch)
}
func (p *RelationsCache) Iter() chan *element.Relation {
rel := make(chan *element.Relation)
go func() {
ro := levigo.NewReadOptions()
ro.SetFillCache(false)
it := p.db.NewIterator(ro)
defer it.Close()
it.SeekToFirst()
2013-05-13 12:55:08 +04:00
for ; it.Valid(); it.Next() {
relation, err := binary.UnmarshalRelation(it.Value())
if err != nil {
panic(err)
}
rel <- relation
}
close(rel)
}()
return rel
}
2013-04-16 23:14:19 +04:00
func (p *RelationsCache) GetRelation(id int64) (*element.Relation, error) {
keyBuf := idToKeyBuf(id)
2013-04-08 23:45:13 +04:00
data, err := p.db.Get(p.ro, keyBuf)
if err != nil {
2013-04-16 23:14:19 +04:00
return nil, err
2013-04-08 23:45:13 +04:00
}
if data == nil {
return nil, NotFound
2013-04-08 23:45:13 +04:00
}
relation, err := binary.UnmarshalRelation(data)
if err != nil {
return nil, err
2013-04-08 23:45:13 +04:00
}
2013-04-16 23:14:19 +04:00
return relation, err
2013-04-08 23:45:13 +04:00
}
2013-02-12 22:45:49 +04:00
func (p *Cache) Close() {
p.db.Close()
}
func (p *InsertedWaysCache) PutMembers(members []element.Member) error {
batch := levigo.NewWriteBatch()
defer batch.Close()
for _, m := range members {
if m.Type != element.WAY {
continue
}
keyBuf := idToKeyBuf(m.Id)
batch.Put(keyBuf, []byte{})
}
return p.db.Write(p.wo, batch)
}
func (p *InsertedWaysCache) IsInserted(id int64) (bool, error) {
keyBuf := idToKeyBuf(id)
data, err := p.db.Get(p.ro, keyBuf)
if err != nil {
return false, err
}
if data == nil {
return false, nil
}
return true, nil
}