imposm3/cache/delta.go

376 lines
8.2 KiB
Go
Raw Permalink Normal View History

package cache
import (
2013-04-14 16:50:38 +04:00
"container/list"
"sort"
2013-04-14 16:50:38 +04:00
"sync"
2015-01-05 11:56:39 +03:00
"github.com/omniscale/imposm3/cache/binary"
"github.com/omniscale/imposm3/element"
)
2013-07-04 19:27:22 +04:00
type byId []element.Node
2013-07-04 19:27:22 +04:00
func (s byId) Len() int { return len(s) }
func (s byId) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byId) Less(i, j int) bool { return s[i].Id < s[j].Id }
2013-07-04 19:27:22 +04:00
type coordsBunch struct {
2013-04-15 23:54:48 +04:00
sync.Mutex
id int64
coords []element.Node
2013-04-15 23:54:48 +04:00
elem *list.Element
needsWrite bool
2013-04-14 16:50:38 +04:00
}
2013-07-04 19:27:22 +04:00
func (b *coordsBunch) GetCoord(id int64) (*element.Node, error) {
2013-05-21 15:06:49 +04:00
idx := sort.Search(len(b.coords), func(i int) bool {
return b.coords[i].Id >= id
})
if idx < len(b.coords) && b.coords[idx].Id == id {
nd := b.coords[idx] // create copy prevent to race when node gets reprojected
return &nd, nil
2013-05-21 15:06:49 +04:00
}
return nil, NotFound
}
func (b *coordsBunch) DeleteCoord(id int64) {
idx := sort.Search(len(b.coords), func(i int) bool {
return b.coords[i].Id >= id
})
if idx < len(b.coords) && b.coords[idx].Id == id {
b.coords = append(b.coords[:idx], b.coords[idx+1:]...)
}
}
// PutCoord puts a single coord into the coords bunch. This function
// does support updating nodes.
func (b *coordsBunch) PutCoord(node element.Node) {
idx := sort.Search(len(b.coords), func(i int) bool {
return b.coords[i].Id >= node.Id
})
if idx < len(b.coords) {
if b.coords[idx].Id == node.Id {
// overwrite
b.coords[idx] = node
} else {
// insert
b.coords = append(b.coords, node)
copy(b.coords[idx+1:], b.coords[idx:])
b.coords[idx] = node
}
} else {
// append
b.coords = append(b.coords, node)
}
}
// PutCoords puts multiple coords into the coords bunch. This bulk function
// does not support duplicate or updated nodes.
func (b *coordsBunch) PutCoords(nodes []element.Node) {
b.coords = append(b.coords, nodes...)
sort.Sort(byId(b.coords))
}
2013-04-15 23:54:48 +04:00
type DeltaCoordsCache struct {
2013-07-04 19:27:22 +04:00
cache
lruList *list.List
2013-07-04 19:27:22 +04:00
table map[int64]*coordsBunch
capacity int64
linearImport bool
mu sync.Mutex
2013-06-18 17:53:04 +04:00
bunchSize int64
readOnly bool
2013-04-14 16:50:38 +04:00
}
2013-07-04 19:27:22 +04:00
func newDeltaCoordsCache(path string) (*DeltaCoordsCache, error) {
2013-04-15 23:54:48 +04:00
coordsCache := DeltaCoordsCache{}
2013-07-04 19:27:22 +04:00
coordsCache.options = &globalCacheOptions.Coords.cacheOptions
2013-04-16 23:14:19 +04:00
err := coordsCache.open(path)
if err != nil {
return nil, err
}
2013-07-04 19:27:22 +04:00
coordsCache.bunchSize = int64(globalCacheOptions.Coords.BunchSize)
2013-04-15 23:54:48 +04:00
coordsCache.lruList = list.New()
2013-06-18 17:53:04 +04:00
// mem req for cache approx. capacity*bunchSize*40
2013-07-04 19:27:22 +04:00
coordsCache.capacity = int64(globalCacheOptions.Coords.BunchCacheCapacity)
coordsCache.table = make(map[int64]*coordsBunch, coordsCache.capacity)
2013-04-16 23:14:19 +04:00
return &coordsCache, nil
2013-04-14 16:50:38 +04:00
}
func (self *DeltaCoordsCache) SetLinearImport(v bool) {
self.linearImport = v
}
2015-01-05 11:56:39 +03:00
func (self *DeltaCoordsCache) Flush() error {
self.mu.Lock()
defer self.mu.Unlock()
for bunchId, bunch := range self.table {
2013-04-15 23:54:48 +04:00
if bunch.needsWrite {
2015-01-05 11:56:39 +03:00
err := self.putCoordsPacked(bunchId, bunch.coords)
if err != nil {
return err
}
2013-04-15 23:54:48 +04:00
}
2013-04-14 16:50:38 +04:00
}
self.lruList.Init()
for k, _ := range self.table {
delete(self.table, k)
}
2015-01-05 11:56:39 +03:00
return nil
2013-05-14 11:26:28 +04:00
}
2015-01-05 11:56:39 +03:00
func (self *DeltaCoordsCache) Close() error {
err := self.Flush()
if err != nil {
return err
}
2013-07-04 19:27:22 +04:00
self.cache.Close()
2015-01-05 11:56:39 +03:00
return nil
2013-04-14 16:50:38 +04:00
}
func (self *DeltaCoordsCache) SetReadOnly(val bool) {
self.readOnly = val
}
func (self *DeltaCoordsCache) GetCoord(id int64) (*element.Node, error) {
2013-06-18 17:53:04 +04:00
bunchId := self.getBunchId(id)
bunch, err := self.getBunch(bunchId)
if err != nil {
return nil, err
}
if self.readOnly {
bunch.Unlock()
} else {
defer bunch.Unlock()
}
2013-05-21 15:06:49 +04:00
return bunch.GetCoord(id)
2013-04-14 16:50:38 +04:00
}
func (self *DeltaCoordsCache) DeleteCoord(id int64) error {
bunchId := self.getBunchId(id)
bunch, err := self.getBunch(bunchId)
if err != nil {
return err
}
defer bunch.Unlock()
bunch.DeleteCoord(id)
bunch.needsWrite = true
return nil
}
func (self *DeltaCoordsCache) FillWay(way *element.Way) error {
if way == nil {
return nil
}
way.Nodes = make([]element.Node, len(way.Refs))
2013-05-21 15:06:49 +04:00
var err error
2013-07-04 19:27:22 +04:00
var bunch *coordsBunch
2013-05-21 15:06:49 +04:00
var bunchId, lastBunchId int64
lastBunchId = -1
for i, id := range way.Refs {
2013-06-18 17:53:04 +04:00
bunchId = self.getBunchId(id)
2013-05-21 15:06:49 +04:00
// re-use bunches
if bunchId != lastBunchId {
if bunch != nil {
bunch.Unlock()
}
bunch, err = self.getBunch(bunchId)
if err != nil {
return err
}
}
lastBunchId = bunchId
nd, err := bunch.GetCoord(id)
if err != nil {
2013-05-21 15:06:49 +04:00
bunch.Unlock()
return err
}
way.Nodes[i] = *nd
}
2013-05-21 15:06:49 +04:00
if bunch != nil {
bunch.Unlock()
}
return nil
}
2013-11-07 20:28:42 +04:00
func removeSkippedNodes(nodes []element.Node) []element.Node {
insertPoint := 0
for i := 0; i < len(nodes); i++ {
if i != insertPoint {
nodes[insertPoint] = nodes[i]
}
if nodes[i].Id != SKIP {
insertPoint += 1
}
}
return nodes[:insertPoint]
}
// PutCoords puts nodes into cache.
// nodes need to be sorted by Id.
func (self *DeltaCoordsCache) PutCoords(nodes []element.Node) error {
var start, currentBunchId int64
nodes = removeSkippedNodes(nodes)
if len(nodes) == 0 {
// skipped all nodes
return nil
}
2013-06-18 17:53:04 +04:00
currentBunchId = self.getBunchId(nodes[0].Id)
2013-04-15 23:54:48 +04:00
start = 0
totalNodes := len(nodes)
2013-04-15 23:54:48 +04:00
for i, node := range nodes {
2013-06-18 17:53:04 +04:00
bunchId := self.getBunchId(node.Id)
if bunchId != currentBunchId {
2013-06-18 17:53:04 +04:00
if self.linearImport && int64(i) > self.bunchSize && int64(i) < int64(totalNodes)-self.bunchSize {
// no need to handle concurrent updates to the same
2013-06-18 17:53:04 +04:00
// bunch if we are not at the boundary of a self.bunchSize
2015-01-05 11:56:39 +03:00
err := self.putCoordsPacked(currentBunchId, nodes[start:i])
if err != nil {
return err
}
} else {
bunch, err := self.getBunch(currentBunchId)
if err != nil {
return err
}
if self.linearImport {
bunch.PutCoords(nodes[start:i])
} else {
for _, node := range nodes[start:i] {
// single inserts to handle updated coords
bunch.PutCoord(node)
}
}
bunch.needsWrite = true
bunch.Unlock()
}
currentBunchId = bunchId
2013-04-15 23:54:48 +04:00
start = int64(i)
}
2013-04-14 16:50:38 +04:00
}
bunch, err := self.getBunch(currentBunchId)
if err != nil {
return err
}
if self.linearImport {
bunch.PutCoords(nodes[start:])
} else {
for _, node := range nodes[start:] {
// single inserts to handle updated coords
bunch.PutCoord(node)
}
}
2013-04-15 23:54:48 +04:00
bunch.needsWrite = true
bunch.Unlock()
return nil
2013-04-14 16:50:38 +04:00
}
func (p *DeltaCoordsCache) putCoordsPacked(bunchId int64, nodes []element.Node) error {
keyBuf := idToKeyBuf(bunchId)
2013-04-15 23:54:48 +04:00
if len(nodes) == 0 {
return p.db.Delete(p.wo, keyBuf)
2013-04-14 16:50:38 +04:00
}
2015-11-21 00:54:53 +03:00
data := make([]byte, 512)
2013-06-03 13:03:45 +04:00
data = binary.MarshalDeltaNodes(nodes, data)
err := p.db.Put(p.wo, keyBuf, data)
2013-05-13 09:39:38 +04:00
if err != nil {
return err
2013-05-13 09:39:38 +04:00
}
2013-06-03 13:03:45 +04:00
return nil
2013-04-14 16:50:38 +04:00
}
func (p *DeltaCoordsCache) getCoordsPacked(bunchId int64, nodes []element.Node) ([]element.Node, error) {
2013-05-10 13:11:29 +04:00
keyBuf := idToKeyBuf(bunchId)
2013-04-14 16:50:38 +04:00
2013-04-15 23:54:48 +04:00
data, err := p.db.Get(p.ro, keyBuf)
if err != nil {
return nil, err
2013-04-14 16:50:38 +04:00
}
if data == nil {
2013-05-06 09:28:02 +04:00
// clear before returning
return nodes[:0], nil
}
nodes, err = binary.UnmarshalDeltaNodes(data, nodes)
2013-04-15 23:54:48 +04:00
if err != nil {
return nil, err
2013-04-14 16:50:38 +04:00
}
2013-04-15 23:54:48 +04:00
return nodes, nil
2013-04-14 16:50:38 +04:00
}
2013-06-18 17:53:04 +04:00
func (self *DeltaCoordsCache) getBunchId(nodeId int64) int64 {
return nodeId / self.bunchSize
2013-04-14 16:50:38 +04:00
}
2013-07-04 19:27:22 +04:00
func (self *DeltaCoordsCache) getBunch(bunchId int64) (*coordsBunch, error) {
2013-04-15 23:54:48 +04:00
self.mu.Lock()
bunch, ok := self.table[bunchId]
2013-05-06 09:28:02 +04:00
var nodes []element.Node
needsGet := false
2013-04-14 16:50:38 +04:00
if !ok {
elem := self.lruList.PushFront(bunchId)
2015-11-21 00:54:53 +03:00
nodes = make([]element.Node, 0, self.bunchSize)
2013-11-13 16:17:54 +04:00
bunch = &coordsBunch{id: bunchId, coords: nodes, elem: elem}
needsGet = true
self.table[bunchId] = bunch
2013-04-15 23:54:48 +04:00
} else {
self.lruList.MoveToFront(bunch.elem)
2013-04-14 16:50:38 +04:00
}
2013-04-15 23:54:48 +04:00
bunch.Lock()
2015-01-05 11:56:39 +03:00
err := self.CheckCapacity()
2013-06-05 15:01:26 +04:00
self.mu.Unlock()
2015-01-05 11:56:39 +03:00
if err != nil {
return nil, err
}
if needsGet {
nodes, err := self.getCoordsPacked(bunchId, nodes)
if err != nil {
return nil, err
}
bunch.coords = nodes
}
return bunch, nil
2013-04-14 16:50:38 +04:00
}
2013-04-15 23:54:48 +04:00
2015-01-05 11:56:39 +03:00
func (self *DeltaCoordsCache) CheckCapacity() error {
2013-04-15 23:54:48 +04:00
for int64(len(self.table)) > self.capacity {
elem := self.lruList.Back()
bunchId := self.lruList.Remove(elem).(int64)
bunch := self.table[bunchId]
bunch.elem = nil
2013-04-15 23:54:48 +04:00
if bunch.needsWrite {
2015-01-05 11:56:39 +03:00
if err := self.putCoordsPacked(bunchId, bunch.coords); err != nil {
return err
}
2013-04-15 23:54:48 +04:00
}
delete(self.table, bunchId)
2013-04-15 23:54:48 +04:00
}
2015-01-05 11:56:39 +03:00
return nil
2013-04-15 23:54:48 +04:00
}
2015-01-05 11:56:39 +03:00
func (self *DeltaCoordsCache) FirstRefIsCached(refs []int64) (bool, error) {
if len(refs) <= 0 {
2015-01-05 11:56:39 +03:00
return false, nil
}
_, err := self.GetCoord(refs[0])
2015-01-05 11:56:39 +03:00
if err == NotFound {
return false, nil
}
if err != nil {
2015-01-05 11:56:39 +03:00
return false, err
}
2015-01-05 11:56:39 +03:00
return true, nil
}