avoid duplicate buffer
parent
2fc074e421
commit
4af47c15cf
|
@ -10,18 +10,12 @@ type DeleteElement struct {
|
|||
}
|
||||
|
||||
type InsertBuffer struct {
|
||||
insertc chan InsertElement
|
||||
deletec chan DeleteElement
|
||||
done chan bool
|
||||
Tables map[string]*TableTx
|
||||
Tables map[string]*TableTx
|
||||
}
|
||||
|
||||
func NewInsertBuffer(pg *PostGIS, bulkImport bool) *InsertBuffer {
|
||||
ib := InsertBuffer{
|
||||
insertc: make(chan InsertElement),
|
||||
deletec: make(chan DeleteElement),
|
||||
done: make(chan bool),
|
||||
Tables: make(map[string]*TableTx),
|
||||
Tables: make(map[string]*TableTx),
|
||||
}
|
||||
for tableName, table := range pg.Tables {
|
||||
tt := pg.NewTableTx(table, bulkImport)
|
||||
|
@ -32,12 +26,10 @@ func NewInsertBuffer(pg *PostGIS, bulkImport bool) *InsertBuffer {
|
|||
ib.Tables[tableName] = tt
|
||||
}
|
||||
|
||||
go ib.loop()
|
||||
return &ib
|
||||
}
|
||||
|
||||
func (ib *InsertBuffer) End() error {
|
||||
ib.Close()
|
||||
for _, tt := range ib.Tables {
|
||||
if err := tt.Commit(); err != nil {
|
||||
return err
|
||||
|
@ -47,42 +39,24 @@ func (ib *InsertBuffer) End() error {
|
|||
}
|
||||
|
||||
func (ib *InsertBuffer) Abort() error {
|
||||
ib.Close()
|
||||
for _, tt := range ib.Tables {
|
||||
tt.Rollback()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ib *InsertBuffer) Close() {
|
||||
ib.done <- true
|
||||
}
|
||||
|
||||
func (ib *InsertBuffer) Insert(table string, row []interface{}) {
|
||||
ib.insertc <- InsertElement{table, row}
|
||||
tt, ok := ib.Tables[table]
|
||||
if !ok {
|
||||
panic("unknown table " + table)
|
||||
}
|
||||
tt.Insert(row)
|
||||
}
|
||||
|
||||
func (ib *InsertBuffer) Delete(table string, id int64) {
|
||||
ib.deletec <- DeleteElement{table, id}
|
||||
}
|
||||
|
||||
func (ib *InsertBuffer) loop() {
|
||||
for {
|
||||
select {
|
||||
case elem := <-ib.insertc:
|
||||
tt, ok := ib.Tables[elem.Table]
|
||||
if !ok {
|
||||
panic("unknown table " + elem.Table)
|
||||
}
|
||||
tt.Insert(elem.Row)
|
||||
case elem := <-ib.deletec:
|
||||
tt, ok := ib.Tables[elem.Table]
|
||||
if !ok {
|
||||
panic("unknown table " + elem.Table)
|
||||
}
|
||||
tt.Delete(elem.Id)
|
||||
case <-ib.done:
|
||||
return
|
||||
}
|
||||
tt, ok := ib.Tables[table]
|
||||
if !ok {
|
||||
panic("unknown table " + table)
|
||||
}
|
||||
tt.Delete(id)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue