imposm3/cache/diff.go

628 lines
13 KiB
Go
Raw Normal View History

package cache
import (
2013-05-13 09:36:54 +04:00
"bytes"
"encoding/binary"
2013-05-13 11:54:00 +04:00
"github.com/jmhodges/levigo"
"goposm/element"
2013-05-13 09:36:54 +04:00
"log"
"os"
"path/filepath"
2013-06-19 18:05:36 +04:00
"runtime"
2013-05-06 15:42:58 +04:00
"sort"
"sync"
)
2013-07-04 19:27:22 +04:00
type byInt64 []int64
2013-05-13 11:54:00 +04:00
2013-07-04 19:27:22 +04:00
func (a byInt64) Len() int { return len(a) }
func (a byInt64) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byInt64) Less(i, j int) bool { return a[i] < a[j] }
2013-05-13 11:54:00 +04:00
type DiffCache struct {
Dir string
2013-05-13 11:54:00 +04:00
Coords *CoordsRefIndex
Ways *WaysRefIndex
opened bool
}
2013-07-05 10:03:13 +04:00
func NewDiffCache(dir string) *DiffCache {
cache := &DiffCache{Dir: dir}
return cache
}
func (c *DiffCache) Close() {
if c.Coords != nil {
2013-05-17 17:30:22 +04:00
c.Coords.Close()
c.Coords = nil
}
if c.Ways != nil {
2013-05-17 17:30:22 +04:00
c.Ways.Close()
c.Ways = nil
}
}
func (c *DiffCache) Open() error {
var err error
2013-07-05 10:03:13 +04:00
c.Coords, err = newCoordsRefIndex(filepath.Join(c.Dir, "coords_index"))
if err != nil {
c.Close()
return err
}
2013-07-05 10:03:13 +04:00
c.Ways, err = newWaysRefIndex(filepath.Join(c.Dir, "ways_index"))
if err != nil {
c.Close()
return err
}
c.opened = true
return nil
}
func (c *DiffCache) Exists() bool {
if c.opened {
return true
}
if _, err := os.Stat(filepath.Join(c.Dir, "coords_index")); !os.IsNotExist(err) {
return true
}
if _, err := os.Stat(filepath.Join(c.Dir, "ways_index")); !os.IsNotExist(err) {
return true
}
return false
}
func (c *DiffCache) Remove() error {
if c.opened {
c.Close()
}
if err := os.RemoveAll(filepath.Join(c.Dir, "coords_index")); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(c.Dir, "ways_index")); err != nil {
return err
}
return nil
}
2013-07-05 10:03:13 +04:00
const bufferSize = 64 * 1024
func insertRefs(refs []int64, ref int64) []int64 {
i := sort.Search(len(refs), func(i int) bool {
return refs[i] >= ref
})
if i < len(refs) && refs[i] >= ref {
2013-07-05 10:56:01 +04:00
if refs[i] > ref {
refs = append(refs, 0)
copy(refs[i+1:], refs[i:])
refs[i] = ref
} // else already inserted
2013-07-05 10:03:13 +04:00
} else {
refs = append(refs, ref)
}
return refs
2013-05-13 11:54:00 +04:00
}
2013-07-05 10:03:13 +04:00
func deleteRefs(refs []int64, ref int64) []int64 {
i := sort.Search(len(refs), func(i int) bool {
return refs[i] >= ref
})
if i < len(refs) && refs[i] == ref {
refs = append(refs[:i], refs[i+1:]...)
}
return refs
}
2013-07-05 11:21:51 +04:00
type idRef struct {
2013-08-06 17:36:21 +04:00
id int64
ref int64
2013-07-05 11:21:51 +04:00
}
type idRefs struct {
2013-07-05 10:03:13 +04:00
id int64
refs []int64
}
2013-07-05 11:21:51 +04:00
// idRefBunch stores multiple IdRefs
type idRefBunch struct {
2013-07-05 10:03:13 +04:00
id int64 // the bunch id
2013-07-05 11:21:51 +04:00
idRefs []idRefs
2013-05-13 11:54:00 +04:00
}
2013-07-05 11:21:51 +04:00
// idRefBunches can hold multiple idRefBunch
type idRefBunches map[int64]idRefBunch
2013-05-13 11:54:00 +04:00
2013-07-05 11:21:51 +04:00
func (bunches *idRefBunches) add(bunchId, id, ref int64) {
idRefs := bunches.getCreate(bunchId, id)
idRefs.refs = insertRefs(idRefs.refs, ref)
}
func (bunches *idRefBunches) getCreate(bunchId, id int64) *idRefs {
bunch, ok := (*bunches)[bunchId]
if !ok {
bunch = idRefBunch{id: bunchId}
}
result := bunch.getCreate(id)
(*bunches)[bunchId] = bunch
return result
}
func (bunch *idRefBunch) get(id int64) *idRefs {
var result *idRefs
i := sort.Search(len(bunch.idRefs), func(i int) bool {
return bunch.idRefs[i].id >= id
})
if i < len(bunch.idRefs) && bunch.idRefs[i].id == id {
result = &bunch.idRefs[i]
}
return result
}
func (bunch *idRefBunch) getCreate(id int64) *idRefs {
var result *idRefs
2013-07-05 10:03:13 +04:00
i := sort.Search(len(bunch.idRefs), func(i int) bool {
return bunch.idRefs[i].id >= id
})
if i < len(bunch.idRefs) && bunch.idRefs[i].id >= id {
if bunch.idRefs[i].id == id {
result = &bunch.idRefs[i]
2013-07-05 10:03:13 +04:00
} else {
2013-07-05 11:21:51 +04:00
bunch.idRefs = append(bunch.idRefs, idRefs{})
2013-07-05 10:03:13 +04:00
copy(bunch.idRefs[i+1:], bunch.idRefs[i:])
2013-07-05 11:21:51 +04:00
bunch.idRefs[i] = idRefs{id: id}
result = &bunch.idRefs[i]
2013-07-05 10:03:13 +04:00
}
} else {
2013-07-05 11:21:51 +04:00
bunch.idRefs = append(bunch.idRefs, idRefs{id: id})
result = &bunch.idRefs[len(bunch.idRefs)-1]
2013-07-05 10:03:13 +04:00
}
return result
2013-07-05 10:03:13 +04:00
}
2013-07-05 11:21:51 +04:00
var idRefBunchesPool chan idRefBunches
2013-06-19 15:05:02 +04:00
func init() {
2013-07-05 11:21:51 +04:00
idRefBunchesPool = make(chan idRefBunches, 1)
2013-06-19 15:05:02 +04:00
}
2013-07-05 11:21:51 +04:00
// bunchRefCache
type bunchRefCache struct {
2013-07-05 10:03:13 +04:00
cache
linearImport bool
buffer idRefBunches
write chan idRefBunches
addc chan idRef
mu sync.Mutex
waitAdd *sync.WaitGroup
waitWrite *sync.WaitGroup
2013-07-05 10:03:13 +04:00
}
2013-07-05 11:21:51 +04:00
func newRefIndex(path string, opts *cacheOptions) (*bunchRefCache, error) {
index := bunchRefCache{}
2013-06-18 17:53:04 +04:00
index.options = opts
err := index.open(path)
if err != nil {
return nil, err
}
2013-07-05 11:21:51 +04:00
index.write = make(chan idRefBunches, 2)
index.buffer = make(idRefBunches, bufferSize)
index.addc = make(chan idRef, 1024)
2013-05-13 11:54:00 +04:00
index.waitWrite = &sync.WaitGroup{}
index.waitAdd = &sync.WaitGroup{}
return &index, nil
}
func (index *bunchRefCache) SetLinearImport(val bool) {
if val != !index.linearImport {
panic("programming error, linear import already set")
}
if val {
index.waitWrite.Add(1)
index.waitAdd.Add(1)
go index.writer()
go index.dispatch()
index.linearImport = true
} else {
close(index.addc)
index.waitAdd.Wait()
close(index.write)
index.waitWrite.Wait()
index.linearImport = false
}
}
2013-07-05 10:03:13 +04:00
type CoordsRefIndex struct {
2013-07-05 11:21:51 +04:00
bunchRefCache
2013-07-05 10:03:13 +04:00
}
type WaysRefIndex struct {
2013-07-05 11:21:51 +04:00
bunchRefCache
2013-07-05 10:03:13 +04:00
}
func newCoordsRefIndex(dir string) (*CoordsRefIndex, error) {
cache, err := newRefIndex(dir, &globalCacheOptions.CoordsIndex)
2013-06-24 15:00:20 +04:00
if err != nil {
return nil, err
}
return &CoordsRefIndex{*cache}, nil
2013-05-13 11:54:00 +04:00
}
2013-07-05 10:03:13 +04:00
func newWaysRefIndex(dir string) (*WaysRefIndex, error) {
cache, err := newRefIndex(dir, &globalCacheOptions.WaysIndex)
2013-06-24 15:00:20 +04:00
if err != nil {
return nil, err
}
return &WaysRefIndex{*cache}, nil
2013-05-13 11:54:00 +04:00
}
2013-08-06 17:39:42 +04:00
func (index *bunchRefCache) getBunchId(id int64) int64 {
return id / 64
2013-08-06 17:29:30 +04:00
}
2013-08-06 17:39:42 +04:00
func (index *bunchRefCache) Close() {
2013-08-06 17:29:30 +04:00
if index.linearImport {
2013-08-06 17:39:42 +04:00
// disable linear import first to flush buffer
index.SetLinearImport(false)
2013-08-06 17:29:30 +04:00
}
2013-07-05 10:03:13 +04:00
2013-08-06 17:39:42 +04:00
index.cache.Close()
2013-07-05 10:03:13 +04:00
}
2013-07-05 11:21:51 +04:00
func (index *bunchRefCache) writer() {
2013-07-04 19:27:22 +04:00
for buffer := range index.write {
if err := index.writeRefs(buffer); err != nil {
2013-05-13 11:54:00 +04:00
log.Println("error while writing ref index", err)
}
}
index.waitWrite.Done()
}
2013-07-05 11:21:51 +04:00
func (index *bunchRefCache) dispatch() {
for idRef := range index.addc {
2013-08-06 17:36:21 +04:00
index.buffer.add(index.getBunchId(idRef.id), idRef.id, idRef.ref)
2013-07-05 10:03:13 +04:00
if len(index.buffer) >= bufferSize {
2013-07-04 19:27:22 +04:00
index.write <- index.buffer
2013-06-19 15:05:02 +04:00
select {
2013-07-05 11:21:51 +04:00
case index.buffer = <-idRefBunchesPool:
2013-06-19 15:05:02 +04:00
default:
2013-07-05 11:21:51 +04:00
index.buffer = make(idRefBunches, bufferSize)
2013-06-19 15:05:02 +04:00
}
2013-05-13 11:54:00 +04:00
}
}
2013-07-04 19:27:22 +04:00
if len(index.buffer) > 0 {
index.write <- index.buffer
index.buffer = nil
2013-05-13 11:54:00 +04:00
}
index.waitAdd.Done()
}
2013-07-05 10:03:13 +04:00
type loadBunchItem struct {
bunchId int64
2013-07-05 11:21:51 +04:00
bunch idRefBunch
2013-05-13 11:54:00 +04:00
}
2013-07-05 10:03:13 +04:00
type writeBunchItem struct {
bunchIdBuf []byte
data []byte
2013-06-19 18:05:36 +04:00
}
2013-07-05 11:21:51 +04:00
func (index *bunchRefCache) writeRefs(idRefs idRefBunches) error {
2013-05-13 11:54:00 +04:00
batch := levigo.NewWriteBatch()
defer batch.Close()
2013-06-19 18:05:36 +04:00
wg := sync.WaitGroup{}
2013-07-05 10:03:13 +04:00
putc := make(chan writeBunchItem)
loadc := make(chan loadBunchItem)
2013-06-19 18:05:36 +04:00
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
for item := range loadc {
2013-07-05 10:03:13 +04:00
keyBuf := idToKeyBuf(item.bunchId)
putc <- writeBunchItem{
2013-06-19 18:05:36 +04:00
keyBuf,
2013-07-05 10:03:13 +04:00
index.loadMergeMarshal(keyBuf, item.bunch.idRefs),
2013-06-19 18:05:36 +04:00
}
}
wg.Done()
}()
2013-05-13 11:54:00 +04:00
}
2013-06-19 18:05:36 +04:00
go func() {
2013-07-05 10:03:13 +04:00
for bunchId, bunch := range idRefs {
loadc <- loadBunchItem{bunchId, bunch}
2013-06-19 18:05:36 +04:00
}
close(loadc)
wg.Wait()
close(putc)
}()
for item := range putc {
2013-07-05 10:03:13 +04:00
batch.Put(item.bunchIdBuf, item.data)
2013-06-19 18:05:36 +04:00
}
2013-06-19 15:05:02 +04:00
go func() {
for k, _ := range idRefs {
delete(idRefs, k)
}
select {
2013-07-05 11:21:51 +04:00
case idRefBunchesPool <- idRefs:
2013-06-19 15:05:02 +04:00
}
}()
2013-05-13 11:54:00 +04:00
return index.db.Write(index.wo, batch)
2013-07-05 10:03:13 +04:00
}
2013-05-13 11:54:00 +04:00
2013-07-05 11:21:51 +04:00
func mergeBunch(bunch, newBunch []idRefs) []idRefs {
2013-07-05 10:03:13 +04:00
lastIdx := 0
NextIdRef:
2013-07-05 11:21:51 +04:00
// for each new idRef...
2013-07-05 10:03:13 +04:00
for _, newIdRefs := range newBunch {
// search place in bunch
for i := lastIdx; i < len(bunch); i++ {
if bunch[i].id == newIdRefs.id {
// id already present
if len(newIdRefs.refs) == 0 {
// no new refs -> delete
bunch = append(bunch[:i], bunch[i+1:]...)
} else { // otherwise add refs
for _, r := range newIdRefs.refs {
bunch[i].refs = insertRefs(bunch[i].refs, r)
}
2013-07-05 10:03:13 +04:00
}
lastIdx = i
continue NextIdRef
2013-07-05 10:03:13 +04:00
}
if bunch[i].id > newIdRefs.id {
// insert before
if len(newIdRefs.refs) > 0 {
bunch = append(bunch, idRefs{})
copy(bunch[i+1:], bunch[i:])
bunch[i] = newIdRefs
}
2013-07-05 10:03:13 +04:00
lastIdx = i
continue NextIdRef
2013-07-05 10:03:13 +04:00
}
}
// insert at the end
if len(newIdRefs.refs) > 0 {
bunch = append(bunch, newIdRefs)
lastIdx = len(bunch) - 1
}
2013-07-05 10:03:13 +04:00
}
return bunch
2013-05-13 11:54:00 +04:00
}
2013-07-05 10:03:13 +04:00
2013-07-05 11:21:51 +04:00
func (index *bunchRefCache) loadMergeMarshal(keyBuf []byte, newBunch []idRefs) []byte {
data, err := index.db.Get(index.ro, keyBuf)
if err != nil {
panic(err)
}
2013-05-13 09:36:54 +04:00
2013-07-05 11:21:51 +04:00
var bunch []idRefs
2013-05-13 09:36:54 +04:00
if data != nil {
2013-07-05 11:21:51 +04:00
bunch = unmarshalBunch(data)
}
2013-07-05 10:03:13 +04:00
if bunch == nil {
bunch = newBunch
2013-05-13 11:54:00 +04:00
} else {
2013-07-05 10:03:13 +04:00
bunch = mergeBunch(bunch, newBunch)
}
2013-05-13 09:36:54 +04:00
2013-07-05 11:21:51 +04:00
data = marshalBunch(bunch)
2013-05-13 11:54:00 +04:00
return data
}
2013-07-05 11:21:51 +04:00
func (index *bunchRefCache) Get(id int64) []int64 {
2013-08-06 17:39:42 +04:00
if index.linearImport {
panic("programming error: get not supported in linearImport mode")
}
2013-07-05 10:03:13 +04:00
keyBuf := idToKeyBuf(index.getBunchId(id))
2013-05-06 15:42:58 +04:00
data, err := index.db.Get(index.ro, keyBuf)
2013-05-14 13:54:39 +04:00
if err != nil {
panic(err)
}
2013-07-05 10:03:13 +04:00
if data != nil {
2013-07-05 11:21:51 +04:00
for _, idRef := range unmarshalBunch(data) {
2013-07-05 10:03:13 +04:00
if idRef.id == id {
return idRef.refs
}
}
}
2013-07-05 10:03:13 +04:00
return nil
2013-05-13 09:36:54 +04:00
}
2013-08-06 17:39:42 +04:00
func (index *bunchRefCache) Add(id, ref int64) error {
keyBuf := idToKeyBuf(index.getBunchId(id))
data, err := index.db.Get(index.ro, keyBuf)
if err != nil {
return err
}
var idRefs []idRefs
if data != nil {
idRefs = unmarshalBunch(data)
}
idRefBunch := idRefBunch{index.getBunchId(id), idRefs}
idRef := idRefBunch.getCreate(id)
idRef.refs = insertRefs(idRef.refs, ref)
data = marshalBunch(idRefBunch.idRefs)
return index.db.Put(index.wo, keyBuf, data)
}
func (index *bunchRefCache) DeleteRef(id, ref int64) error {
2013-08-06 17:36:21 +04:00
if index.linearImport {
panic("programming error: delete not supported in linearImport mode")
}
keyBuf := idToKeyBuf(index.getBunchId(id))
data, err := index.db.Get(index.ro, keyBuf)
if err != nil {
return err
}
if data != nil {
idRefs := unmarshalBunch(data)
idRefBunch := idRefBunch{index.getBunchId(id), idRefs}
idRef := idRefBunch.get(id)
if idRef != nil {
idRef.refs = deleteRefs(idRef.refs, ref)
data := marshalBunch(idRefs)
return index.db.Put(index.wo, keyBuf, data)
}
}
return nil
}
2013-08-06 17:36:21 +04:00
func (index *bunchRefCache) Delete(id int64) error {
if index.linearImport {
panic("programming error: delete not supported in linearImport mode")
}
keyBuf := idToKeyBuf(index.getBunchId(id))
data, err := index.db.Get(index.ro, keyBuf)
if err != nil {
return err
}
if data != nil {
idRefs := unmarshalBunch(data)
idRefBunch := idRefBunch{index.getBunchId(id), idRefs}
idRef := idRefBunch.get(id)
if idRef != nil {
idRef.refs = []int64{}
data := marshalBunch(idRefs)
return index.db.Put(index.wo, keyBuf, data)
}
}
return nil
}
2013-08-06 17:39:42 +04:00
func (index *CoordsRefIndex) AddFromWay(way *element.Way) {
for _, node := range way.Nodes {
if index.linearImport {
index.addc <- idRef{id: node.Id, ref: way.Id}
} else {
index.Add(node.Id, way.Id)
}
}
}
func (index *CoordsRefIndex) DeleteFromWay(way *element.Way) {
if index.linearImport {
panic("programming error: delete not supported in linearImport mode")
}
for _, node := range way.Nodes {
index.DeleteRef(node.Id, way.Id)
}
}
func (index *WaysRefIndex) AddFromMembers(relId int64, members []element.Member) {
for _, member := range members {
if member.Type == element.WAY {
if index.linearImport {
index.addc <- idRef{id: member.Id, ref: relId}
} else {
index.Add(member.Id, relId)
}
}
}
}
2013-07-05 11:21:51 +04:00
func marshalBunch(idRefs []idRefs) []byte {
2013-07-05 10:03:13 +04:00
buf := make([]byte, len(idRefs)*(4+1+6)+binary.MaxVarintLen64)
2013-05-13 09:36:54 +04:00
lastRef := int64(0)
2013-07-05 10:03:13 +04:00
lastId := int64(0)
nextPos := 0
2013-05-13 09:36:54 +04:00
2013-07-05 10:03:13 +04:00
nextPos += binary.PutUvarint(buf[nextPos:], uint64(len(idRefs)))
2013-05-13 09:36:54 +04:00
2013-07-05 10:03:13 +04:00
for _, idRef := range idRefs {
2013-05-13 09:36:54 +04:00
if len(buf)-nextPos < binary.MaxVarintLen64 {
tmp := make([]byte, len(buf)*2)
copy(tmp, buf)
buf = tmp
2013-05-13 09:36:54 +04:00
}
2013-07-05 10:03:13 +04:00
nextPos += binary.PutVarint(buf[nextPos:], idRef.id-lastId)
lastId = idRef.id
}
for _, idRef := range idRefs {
if len(buf)-nextPos < binary.MaxVarintLen64 {
tmp := make([]byte, len(buf)*2)
copy(tmp, buf)
buf = tmp
}
nextPos += binary.PutUvarint(buf[nextPos:], uint64(len(idRef.refs)))
}
for _, idRef := range idRefs {
for _, ref := range idRef.refs {
if len(buf)-nextPos < binary.MaxVarintLen64 {
tmp := make([]byte, len(buf)*2)
copy(tmp, buf)
buf = tmp
}
nextPos += binary.PutVarint(buf[nextPos:], ref-lastRef)
lastRef = ref
}
2013-05-13 09:36:54 +04:00
}
return buf[:nextPos]
}
2013-07-05 10:03:13 +04:00
2013-07-05 11:21:51 +04:00
func unmarshalBunch(buf []byte) []idRefs {
2013-07-05 10:03:13 +04:00
r := bytes.NewBuffer(buf)
n, err := binary.ReadUvarint(r)
if err != nil {
return nil
}
2013-07-05 11:21:51 +04:00
idRefs := make([]idRefs, n)
2013-07-05 10:03:13 +04:00
last := int64(0)
for i := 0; uint64(i) < n; i++ {
idRefs[i].id, err = binary.ReadVarint(r)
if err != nil {
panic(err)
}
idRefs[i].id += last
last = idRefs[i].id
}
var numRefs uint64
for i := 0; uint64(i) < n; i++ {
numRefs, err = binary.ReadUvarint(r)
if err != nil {
panic(err)
}
idRefs[i].refs = make([]int64, numRefs)
}
last = 0
for idIdx := 0; uint64(idIdx) < n; idIdx++ {
for refIdx := 0; refIdx < len(idRefs[idIdx].refs); refIdx++ {
idRefs[idIdx].refs[refIdx], err = binary.ReadVarint(r)
if err != nil {
panic(err)
}
idRefs[idIdx].refs[refIdx] += last
last = idRefs[idIdx].refs[refIdx]
}
}
return idRefs
}