make GeneralizedTableTx synchronous
parent
f55516b8ad
commit
753e5a2caa
|
@ -31,7 +31,7 @@ type tableTx struct {
|
|||
func NewTableTx(pg *PostGIS, spec *TableSpec, bulkImport bool) TableTx {
|
||||
tt := &tableTx{
|
||||
Pg: pg,
|
||||
Table: spec.Prefix + spec.Name,
|
||||
Table: spec.FullName,
|
||||
Spec: spec,
|
||||
wg: &sync.WaitGroup{},
|
||||
rows: make(chan []interface{}, 64),
|
||||
|
@ -139,20 +139,14 @@ type generalizedTableTx struct {
|
|||
DeleteStmt *sql.Stmt
|
||||
InsertSql string
|
||||
DeleteSql string
|
||||
wg *sync.WaitGroup
|
||||
rows chan []interface{}
|
||||
}
|
||||
|
||||
func NewGeneralizedTableTx(pg *PostGIS, spec *GeneralizedTableSpec) TableTx {
|
||||
tt := &generalizedTableTx{
|
||||
Pg: pg,
|
||||
Table: spec.Prefix + spec.Name,
|
||||
Table: spec.FullName,
|
||||
Spec: spec,
|
||||
wg: &sync.WaitGroup{},
|
||||
rows: make(chan []interface{}, 64),
|
||||
}
|
||||
tt.wg.Add(1)
|
||||
go tt.loop()
|
||||
return tt
|
||||
}
|
||||
|
||||
|
@ -182,19 +176,11 @@ func (tt *generalizedTableTx) Begin() error {
|
|||
}
|
||||
|
||||
func (tt *generalizedTableTx) Insert(row []interface{}) error {
|
||||
tt.rows <- row
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tt *generalizedTableTx) loop() {
|
||||
for row := range tt.rows {
|
||||
_, err := tt.InsertStmt.Exec(row[0])
|
||||
if err != nil {
|
||||
// TODO
|
||||
log.Fatal(&SQLInsertError{SQLError{tt.InsertSql, err}, row})
|
||||
}
|
||||
_, err := tt.InsertStmt.Exec(row[0])
|
||||
if err != nil {
|
||||
return &SQLInsertError{SQLError{tt.InsertSql, err}, row}
|
||||
}
|
||||
tt.wg.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tt *generalizedTableTx) Delete(id int64) error {
|
||||
|
@ -206,8 +192,6 @@ func (tt *generalizedTableTx) Delete(id int64) error {
|
|||
}
|
||||
|
||||
func (tt *generalizedTableTx) Commit() error {
|
||||
close(tt.rows)
|
||||
tt.wg.Wait()
|
||||
err := tt.Tx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue