remove deleted elements from cache in diff mode

master
Oliver Tonnhofer 2013-07-05 15:14:21 +02:00
parent eb449b2636
commit f75c8994c8
11 changed files with 199 additions and 45 deletions

28
cache/delta.go vendored
View File

@ -84,6 +84,15 @@ func (b *coordsBunch) GetCoord(id int64) (*element.Node, error) {
return nil, NotFound
}
func (b *coordsBunch) DeleteCoord(id int64) {
idx := sort.Search(len(b.coords), func(i int) bool {
return b.coords[i].Id >= id
})
if idx < len(b.coords) && b.coords[idx].Id == id {
b.coords = append(b.coords[:idx], b.coords[idx+1:]...)
}
}
type DeltaCoordsCache struct {
cache
lruList *list.List
@ -144,6 +153,18 @@ func (self *DeltaCoordsCache) GetCoord(id int64) (*element.Node, error) {
return bunch.GetCoord(id)
}
func (self *DeltaCoordsCache) DeleteCoord(id int64) error {
bunchId := self.getBunchId(id)
bunch, err := self.getBunch(bunchId)
if err != nil {
return err
}
defer bunch.Unlock()
bunch.DeleteCoord(id)
bunch.needsWrite = true
return nil
}
func (self *DeltaCoordsCache) FillWay(way *element.Way) error {
if way == nil {
return nil
@ -226,11 +247,12 @@ var (
)
func (p *DeltaCoordsCache) putCoordsPacked(bunchId int64, nodes []element.Node) error {
if len(nodes) == 0 {
return nil
}
keyBuf := idToKeyBuf(bunchId)
if len(nodes) == 0 {
return p.db.Delete(p.wo, keyBuf)
}
var data []byte
select {
case data = <-freeBuffer:

31
cache/delta_test.go vendored
View File

@ -48,7 +48,6 @@ func TestReadWriteDeltaCoords(t *testing.T) {
if err != nil {
t.Fatal()
}
defer cache.Close()
for i := 0; i < len(nodes); i++ {
data, err := cache.GetCoord(int64(i))
@ -67,4 +66,34 @@ func TestReadWriteDeltaCoords(t *testing.T) {
t.Error("missing node returned not NotFound")
}
// test delete
cache.PutCoords([]element.Node{mknode(999999)})
cache.Close()
cache, err = newDeltaCoordsCache(cache_dir)
if err != nil {
t.Fatal()
}
_, err = cache.GetCoord(999999)
if err == NotFound {
t.Error("missing coord")
}
err = cache.DeleteCoord(999999)
if err != nil {
t.Fatal(err)
}
cache.Close()
cache, err = newDeltaCoordsCache(cache_dir)
if err != nil {
t.Fatal()
}
defer cache.Close()
_, err = cache.GetCoord(999999)
if err != NotFound {
t.Fatal("deleted node returned not NotFound")
}
}

64
cache/diff.go vendored
View File

@ -103,8 +103,9 @@ func insertRefs(refs []int64, ref int64) []int64 {
}
type idRef struct {
id int64
ref int64 // for single id/ref
id int64
ref int64
delete bool
}
type idRefs struct {
@ -122,31 +123,41 @@ type idRefBunch struct {
type idRefBunches map[int64]idRefBunch
func (bunches *idRefBunches) add(bunchId, id, ref int64) {
idRefs := bunches.getIdRefsCreateMissing(bunchId, id)
idRefs.refs = insertRefs(idRefs.refs, ref)
}
func (bunches *idRefBunches) delete(bunchId, id int64) {
idRefs := bunches.getIdRefsCreateMissing(bunchId, id)
idRefs.refs = nil
}
func (bunches *idRefBunches) getIdRefsCreateMissing(bunchId, id int64) *idRefs {
bunch, ok := (*bunches)[bunchId]
if !ok {
bunch = idRefBunch{id: bunchId}
}
var targetIdRefs *idRefs
var result *idRefs
i := sort.Search(len(bunch.idRefs), func(i int) bool {
return bunch.idRefs[i].id >= id
})
if i < len(bunch.idRefs) && bunch.idRefs[i].id >= id {
if bunch.idRefs[i].id == id {
targetIdRefs = &bunch.idRefs[i]
result = &bunch.idRefs[i]
} else {
bunch.idRefs = append(bunch.idRefs, idRefs{})
copy(bunch.idRefs[i+1:], bunch.idRefs[i:])
bunch.idRefs[i] = idRefs{id: id}
targetIdRefs = &bunch.idRefs[i]
result = &bunch.idRefs[i]
}
} else {
bunch.idRefs = append(bunch.idRefs, idRefs{id: id})
targetIdRefs = &bunch.idRefs[len(bunch.idRefs)-1]
result = &bunch.idRefs[len(bunch.idRefs)-1]
}
targetIdRefs.refs = insertRefs(targetIdRefs.refs, ref)
(*bunches)[bunchId] = bunch
return result
}
var idRefBunchesPool chan idRefBunches
@ -243,7 +254,11 @@ func (index *bunchRefCache) Close() {
func (index *bunchRefCache) dispatch() {
for idRef := range index.add {
index.buffer.add(index.getBunchId(idRef.id), idRef.id, idRef.ref)
if idRef.delete {
index.buffer.delete(index.getBunchId(idRef.id), idRef.id)
} else {
index.buffer.add(index.getBunchId(idRef.id), idRef.id, idRef.ref)
}
if len(index.buffer) >= bufferSize {
index.write <- index.buffer
select {
@ -335,25 +350,34 @@ NextIdRef:
// search place in bunch
for i := lastIdx; i < len(bunch); i++ {
if bunch[i].id == newIdRefs.id {
// id already present, add refs
for _, r := range newIdRefs.refs {
bunch[i].refs = insertRefs(bunch[i].refs, r)
// id already present
if len(newIdRefs.refs) == 0 {
// no new refs -> delete
bunch = append(bunch[:i], bunch[i+1:]...)
} else { // otherwise add refs
for _, r := range newIdRefs.refs {
bunch[i].refs = insertRefs(bunch[i].refs, r)
}
}
lastIdx = i
break NextIdRef
continue NextIdRef
}
if bunch[i].id > newIdRefs.id {
// insert before
bunch = append(bunch, idRefs{})
copy(bunch[i+1:], bunch[i:])
bunch[i] = newIdRefs
if len(newIdRefs.refs) > 0 {
bunch = append(bunch, idRefs{})
copy(bunch[i+1:], bunch[i:])
bunch[i] = newIdRefs
}
lastIdx = i
break NextIdRef
continue NextIdRef
}
}
// insert at the end
bunch = append(bunch, newIdRefs)
lastIdx = len(bunch) - 1
if len(newIdRefs.refs) > 0 {
bunch = append(bunch, newIdRefs)
lastIdx = len(bunch) - 1
}
}
return bunch
}
@ -398,6 +422,10 @@ func (index *bunchRefCache) Get(id int64) []int64 {
return nil
}
func (index *bunchRefCache) Delete(id int64) {
index.add <- idRef{id: id, delete: true}
}
func marshalBunch(idRefs []idRefs) []byte {
buf := make([]byte, len(idRefs)*(4+1+6)+binary.MaxVarintLen64)

18
cache/diff_test.go vendored
View File

@ -57,7 +57,7 @@ func TestWriteDiff(t *testing.T) {
for w := 0; w < 5; w++ {
for n := 0; n < 200; n++ {
cache.add <- idRef{int64(n), int64(w)}
cache.add <- idRef{id: int64(n), ref: int64(w)}
}
}
cache.Close()
@ -139,7 +139,7 @@ func BenchmarkWriteDiff(b *testing.B) {
for i := 0; i < b.N; i++ {
for w := 0; w < 5; w++ {
for n := 0; n < 200; n++ {
cache.add <- idRef{int64(n), int64(w)}
cache.add <- idRef{id: int64(n), ref: int64(w)}
}
}
}
@ -182,6 +182,20 @@ func TestMergeIdRefs(t *testing.T) {
if len(bunch) != 4 {
t.Fatal(bunch)
}
// remove multiple
bunch = mergeBunch(bunch, []idRefs{idRefs{40, []int64{}}, idRefs{60, []int64{}}})
if bunch[0].id != 50 || bunch[1].id != 70 || len(bunch) != 2 {
t.Fatal(bunch)
}
// add multiple
bunch = mergeBunch(bunch, []idRefs{idRefs{40, []int64{1}}, idRefs{60, []int64{1}}, idRefs{80, []int64{1}}})
if len(bunch) != 5 || bunch[0].id != 40 ||
bunch[2].id != 60 || bunch[4].id != 80 {
t.Fatal(bunch)
}
}
func TestIdRefBunches(t *testing.T) {

5
cache/nodes.go vendored
View File

@ -69,6 +69,11 @@ func (p *NodesCache) GetNode(id int64) (*element.Node, error) {
return node, nil
}
func (p *NodesCache) DeleteNode(id int64) error {
keyBuf := idToKeyBuf(id)
return p.db.Delete(p.wo, keyBuf)
}
func (p *NodesCache) Iter() chan *element.Node {
nodes := make(chan *element.Node)
go func() {

32
cache/query/query.go vendored
View File

@ -51,15 +51,15 @@ func printWays(osmCache *cache.OSMCache, diffCache *cache.DiffCache, ids []int64
printNodes(osmCache, nil, way.Refs, false)
log.SetPrefix(oldPrefix)
}
if deps {
oldPrefix := log.Prefix()
log.SetPrefix(oldPrefix + " ")
rels := diffCache.Ways.Get(id)
if len(rels) != 0 {
printRelations(osmCache, rels, false)
}
log.SetPrefix(oldPrefix)
}
if deps {
oldPrefix := log.Prefix()
log.SetPrefix(oldPrefix + " ")
rels := diffCache.Ways.Get(id)
if len(rels) != 0 {
printRelations(osmCache, rels, false)
}
log.SetPrefix(oldPrefix)
}
}
}
@ -80,15 +80,15 @@ func printNodes(osmCache *cache.OSMCache, diffCache *cache.DiffCache, ids []int6
}
if node != nil {
log.Println(node)
if deps {
oldPrefix := log.Prefix()
log.SetPrefix(oldPrefix + " ")
ways := diffCache.Coords.Get(id)
if len(ways) != 0 {
printWays(osmCache, diffCache, ways, false, true)
}
log.SetPrefix(oldPrefix)
}
if deps {
oldPrefix := log.Prefix()
log.SetPrefix(oldPrefix + " ")
ways := diffCache.Coords.Get(id)
if len(ways) != 0 {
printWays(osmCache, diffCache, ways, false, true)
}
log.SetPrefix(oldPrefix)
}
}
}

5
cache/relations.go vendored
View File

@ -85,3 +85,8 @@ func (p *RelationsCache) GetRelation(id int64) (*element.Relation, error) {
relation.Id = id
return relation, err
}
func (p *RelationsCache) DeleteRelation(id int64) error {
keyBuf := idToKeyBuf(id)
return p.db.Delete(p.wo, keyBuf)
}

5
cache/ways.go vendored
View File

@ -61,6 +61,11 @@ func (p *WaysCache) GetWay(id int64) (*element.Way, error) {
return way, nil
}
func (p *WaysCache) DeleteWay(id int64) error {
keyBuf := idToKeyBuf(id)
return p.db.Delete(p.wo, keyBuf)
}
func (p *WaysCache) Iter() chan *element.Way {
ways := make(chan *element.Way, 1024)
go func() {

View File

@ -16,6 +16,7 @@ type DB interface {
End() error
Abort() error
Init() error
Close() error
RowInserter
}
@ -82,6 +83,7 @@ type NullDb struct{}
func (n *NullDb) Init() error { return nil }
func (n *NullDb) Begin() error { return nil }
func (n *NullDb) End() error { return nil }
func (n *NullDb) Close() error { return nil }
func (n *NullDb) Abort() error { return nil }
func (n *NullDb) Insert(string, []interface{}) {}

View File

@ -458,6 +458,10 @@ func (pg *PostGIS) End() error {
return pg.InputBuffer.End()
}
func (pg *PostGIS) Close() error {
return pg.Db.Close()
}
type TableTx struct {
Pg *PostGIS
Tx *sql.Tx

View File

@ -23,7 +23,14 @@ var (
func main() {
flag.Parse()
elems, errc := parser.Parse(flag.Arg(0))
for _, oscFile := range flag.Args() {
update(oscFile)
}
}
func update(oscFile string) {
flag.Parse()
elems, errc := parser.Parse(oscFile)
osmCache := cache.NewOSMCache("/tmp/goposm")
err := osmCache.Open()
@ -116,7 +123,24 @@ For:
if elem.Del {
deleter.Delete(elem)
if !elem.Add {
// TODO delete from osmCache
if elem.Rel != nil {
if err := osmCache.Relations.DeleteRelation(elem.Rel.Id); err != nil {
log.Fatal(err)
}
} else if elem.Way != nil {
if err := osmCache.Ways.DeleteWay(elem.Way.Id); err != nil {
log.Fatal(err)
}
diffCache.Ways.Delete(elem.Way.Id)
} else if elem.Node != nil {
if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil {
log.Fatal(err)
}
if err := osmCache.Coords.DeleteCoord(elem.Node.Id); err != nil {
log.Fatal(err)
}
diffCache.Coords.Delete(elem.Node.Id)
}
}
}
if elem.Add {
@ -151,6 +175,9 @@ For:
for nodeId, _ := range nodeIds {
node, err := osmCache.Nodes.GetNode(nodeId)
if err != nil {
if err != cache.NotFound {
log.Println(node, err)
}
// missing nodes can still be Coords
// no `continue` here
}
@ -168,7 +195,9 @@ For:
for wayId, _ := range wayIds {
way, err := osmCache.Ways.GetWay(wayId)
if err != nil {
log.Println(wayId, err)
if err != cache.NotFound {
log.Println(way, err)
}
continue
}
// insert new way
@ -183,7 +212,9 @@ For:
for relId, _ := range relIds {
rel, err := osmCache.Relations.GetRelation(relId)
if err != nil {
log.Println(err)
if err != cache.NotFound {
log.Println(rel, err)
}
continue
}
// insert new relation
@ -198,7 +229,16 @@ For:
relWriter.Close()
wayWriter.Close()
err = db.End()
if err != nil {
log.Fatal(err)
}
err = db.Close()
if err != nil {
log.Fatal(err)
}
progress.Stop()
osmCache.Coords.Flush()
osmCache.Close()
diffCache.Close()
}