imposm3/cache/diff.go

322 lines
6.1 KiB
Go
Raw Normal View History

package cache
import (
2013-05-13 09:36:54 +04:00
"bytes"
"encoding/binary"
2013-05-13 11:54:00 +04:00
"github.com/jmhodges/levigo"
"goposm/element"
2013-05-13 09:36:54 +04:00
"io"
"log"
"os"
"path/filepath"
2013-05-06 15:42:58 +04:00
"sort"
"sync"
)
2013-05-13 11:54:00 +04:00
type Refs []int64
func (a Refs) Len() int { return len(a) }
func (a Refs) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Refs) Less(i, j int) bool { return a[i] < a[j] }
type DiffCache struct {
Dir string
2013-05-13 11:54:00 +04:00
Coords *CoordsRefIndex
Ways *WaysRefIndex
opened bool
}
func (c *DiffCache) Close() {
if c.Coords != nil {
2013-05-17 17:30:22 +04:00
c.Coords.Close()
c.Coords = nil
}
if c.Ways != nil {
2013-05-17 17:30:22 +04:00
c.Ways.Close()
c.Ways = nil
}
}
func NewDiffCache(dir string) *DiffCache {
cache := &DiffCache{Dir: dir}
return cache
}
func (c *DiffCache) Open() error {
var err error
2013-05-13 11:54:00 +04:00
c.Coords, err = NewCoordsRefIndex(filepath.Join(c.Dir, "coords_index"))
if err != nil {
c.Close()
return err
}
2013-05-13 11:54:00 +04:00
c.Ways, err = NewWaysRefIndex(filepath.Join(c.Dir, "ways_index"))
if err != nil {
c.Close()
return err
}
c.opened = true
return nil
}
func (c *DiffCache) Exists() bool {
if c.opened {
return true
}
if _, err := os.Stat(filepath.Join(c.Dir, "coords_index")); !os.IsNotExist(err) {
return true
}
if _, err := os.Stat(filepath.Join(c.Dir, "ways_index")); !os.IsNotExist(err) {
return true
}
return false
}
func (c *DiffCache) Remove() error {
if c.opened {
c.Close()
}
if err := os.RemoveAll(filepath.Join(c.Dir, "coords_index")); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(c.Dir, "ways_index")); err != nil {
return err
}
return nil
}
type RefIndex struct {
Cache
2013-05-13 11:54:00 +04:00
cache map[int64][]int64
write chan map[int64][]int64
add chan idRef
mu sync.Mutex
waitAdd *sync.WaitGroup
waitWrite *sync.WaitGroup
}
type CoordsRefIndex struct {
RefIndex
}
type WaysRefIndex struct {
RefIndex
}
2013-05-13 11:54:00 +04:00
type idRef struct {
id int64
ref int64
}
2013-06-19 13:50:57 +04:00
const cacheSize = 256 * 1024
2013-05-13 11:54:00 +04:00
2013-06-19 15:05:02 +04:00
var refCaches chan map[int64][]int64
func init() {
refCaches = make(chan map[int64][]int64, 1)
}
2013-06-18 17:53:04 +04:00
func NewRefIndex(path string, opts *CacheOptions) (*RefIndex, error) {
index := RefIndex{}
2013-06-18 17:53:04 +04:00
index.options = opts
err := index.open(path)
if err != nil {
return nil, err
}
2013-05-30 14:00:11 +04:00
index.write = make(chan map[int64][]int64, 2)
2013-05-13 11:54:00 +04:00
index.cache = make(map[int64][]int64, cacheSize)
2013-05-30 14:00:11 +04:00
index.add = make(chan idRef, 1024)
2013-05-13 11:54:00 +04:00
index.waitWrite = &sync.WaitGroup{}
index.waitAdd = &sync.WaitGroup{}
index.waitWrite.Add(1)
index.waitAdd.Add(1)
go index.writer()
go index.dispatch()
return &index, nil
}
2013-05-13 11:54:00 +04:00
func NewCoordsRefIndex(dir string) (*CoordsRefIndex, error) {
2013-06-18 17:53:04 +04:00
cache, err := NewRefIndex(dir, &osmCacheOptions.CoordsIndex)
2013-05-13 11:54:00 +04:00
refIdx, err := &CoordsRefIndex{*cache}, err
return refIdx, err
}
func NewWaysRefIndex(dir string) (*WaysRefIndex, error) {
2013-06-18 17:53:04 +04:00
cache, err := NewRefIndex(dir, &osmCacheOptions.WaysIndex)
2013-05-13 11:54:00 +04:00
return &WaysRefIndex{*cache}, err
}
func (index *RefIndex) writer() {
for cache := range index.write {
if err := index.writeRefs(cache); err != nil {
log.Println("error while writing ref index", err)
}
}
index.waitWrite.Done()
}
func (index *RefIndex) Close() {
close(index.add)
index.waitAdd.Wait()
close(index.write)
index.waitWrite.Wait()
2013-05-14 13:54:39 +04:00
index.Cache.Close()
2013-05-13 11:54:00 +04:00
}
func (index *RefIndex) dispatch() {
for idRef := range index.add {
index.addToCache(idRef.id, idRef.ref)
if len(index.cache) >= cacheSize {
index.write <- index.cache
2013-06-19 15:05:02 +04:00
select {
case index.cache = <-refCaches:
default:
index.cache = make(map[int64][]int64, cacheSize)
}
2013-05-13 11:54:00 +04:00
}
}
2013-05-14 13:54:39 +04:00
if len(index.cache) > 0 {
2013-05-13 11:54:00 +04:00
index.write <- index.cache
2013-05-14 13:54:39 +04:00
index.cache = nil
2013-05-13 11:54:00 +04:00
}
index.waitAdd.Done()
}
func (index *CoordsRefIndex) AddFromWay(way *element.Way) {
for _, node := range way.Nodes {
index.add <- idRef{node.Id, way.Id}
}
}
func (index *WaysRefIndex) AddFromMembers(relId int64, members []element.Member) {
for _, member := range members {
if member.Type == element.WAY {
index.add <- idRef{member.Id, relId}
}
}
}
2013-05-13 11:54:00 +04:00
func (index *RefIndex) addToCache(id, ref int64) {
refs, ok := index.cache[id]
if !ok {
refs = make([]int64, 0, 1)
}
refs = insertRefs(refs, ref)
index.cache[id] = refs
}
func (index *RefIndex) writeRefs(idRefs map[int64][]int64) error {
batch := levigo.NewWriteBatch()
defer batch.Close()
for id, refs := range idRefs {
keyBuf := idToKeyBuf(id)
data := index.loadAppendMarshal(id, refs)
batch.Put(keyBuf, data)
}
2013-06-19 15:05:02 +04:00
go func() {
for k, _ := range idRefs {
delete(idRefs, k)
}
select {
case refCaches <- idRefs:
}
}()
2013-05-13 11:54:00 +04:00
return index.db.Write(index.wo, batch)
}
func (index *RefIndex) loadAppendMarshal(id int64, newRefs []int64) []byte {
2013-05-10 13:11:29 +04:00
keyBuf := idToKeyBuf(id)
data, err := index.db.Get(index.ro, keyBuf)
if err != nil {
panic(err)
}
2013-05-13 09:36:54 +04:00
var refs []int64
if data != nil {
2013-05-13 09:36:54 +04:00
refs = UnmarshalRefs(data)
}
2013-05-13 09:36:54 +04:00
if refs == nil {
2013-05-13 11:54:00 +04:00
refs = newRefs
} else {
refs = append(refs, newRefs...)
sort.Sort(Refs(refs))
}
2013-05-13 09:36:54 +04:00
data = MarshalRefs(refs)
2013-05-13 11:54:00 +04:00
return data
}
2013-05-13 09:36:54 +04:00
func insertRefs(refs []int64, ref int64) []int64 {
i := sort.Search(len(refs), func(i int) bool {
return refs[i] >= ref
2013-05-06 15:42:58 +04:00
})
2013-05-13 09:36:54 +04:00
if i < len(refs) && refs[i] >= ref {
refs = append(refs, 0)
copy(refs[i+1:], refs[i:])
refs[i] = ref
2013-05-06 15:42:58 +04:00
} else {
2013-05-13 09:36:54 +04:00
refs = append(refs, ref)
2013-05-06 15:42:58 +04:00
}
2013-05-13 09:36:54 +04:00
return refs
2013-05-06 15:42:58 +04:00
}
func (index *RefIndex) Get(id int64) []int64 {
2013-05-10 13:11:29 +04:00
keyBuf := idToKeyBuf(id)
data, err := index.db.Get(index.ro, keyBuf)
2013-05-14 13:54:39 +04:00
if err != nil {
panic(err)
}
2013-05-13 09:36:54 +04:00
var refs []int64
if data != nil {
2013-05-13 09:36:54 +04:00
refs = UnmarshalRefs(data)
if err != nil {
panic(err)
}
}
2013-05-13 09:36:54 +04:00
return refs
}
func UnmarshalRefs(buf []byte) []int64 {
refs := make([]int64, 0, 8)
r := bytes.NewBuffer(buf)
lastRef := int64(0)
for {
ref, err := binary.ReadVarint(r)
if err == io.EOF {
break
}
if err != nil {
log.Println("error while unmarshaling refs:", err)
break
}
ref = lastRef + ref
refs = append(refs, ref)
lastRef = ref
}
return refs
}
func MarshalRefs(refs []int64) []byte {
buf := make([]byte, len(refs)*4+binary.MaxVarintLen64)
lastRef := int64(0)
nextPos := 0
for _, ref := range refs {
if len(buf)-nextPos < binary.MaxVarintLen64 {
tmp := make([]byte, len(buf)*2)
copy(tmp, buf)
buf = tmp
2013-05-13 09:36:54 +04:00
}
nextPos += binary.PutVarint(buf[nextPos:], ref-lastRef)
lastRef = ref
}
return buf[:nextPos]
}