create goroutine for each TableTx
parent
8a9fe9bf2f
commit
a415956ca7
|
@ -9,6 +9,7 @@ import (
|
|||
"goposm/logging"
|
||||
"goposm/mapping"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var log = logging.NewLogger("PostGIS")
|
||||
|
@ -399,6 +400,8 @@ type TableTx struct {
|
|||
Spec *TableSpec
|
||||
Stmt *sql.Stmt
|
||||
Sql string
|
||||
wg *sync.WaitGroup
|
||||
rows chan []interface{}
|
||||
}
|
||||
|
||||
func (tt *TableTx) Begin() error {
|
||||
|
@ -417,13 +420,21 @@ func (tt *TableTx) Begin() error {
|
|||
}
|
||||
|
||||
func (tt *TableTx) Insert(row []interface{}) error {
|
||||
_, err := tt.Stmt.Exec(row...)
|
||||
if err != nil {
|
||||
return &SQLInsertError{SQLError{tt.Sql, err}, row}
|
||||
}
|
||||
tt.rows <- row
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tt *TableTx) loop() {
|
||||
for row := range tt.rows {
|
||||
_, err := tt.Stmt.Exec(row...)
|
||||
if err != nil {
|
||||
// TODO
|
||||
log.Fatal(&SQLInsertError{SQLError{tt.Sql, err}, row})
|
||||
}
|
||||
}
|
||||
tt.wg.Done()
|
||||
}
|
||||
|
||||
func (tt *TableTx) Delete(id int64) error {
|
||||
sql := tt.Spec.DeleteSQL()
|
||||
stmt, err := tt.Tx.Prepare(sql)
|
||||
|
@ -440,6 +451,8 @@ func (tt *TableTx) Delete(id int64) error {
|
|||
}
|
||||
|
||||
func (tt *TableTx) Commit() error {
|
||||
close(tt.rows)
|
||||
tt.wg.Wait()
|
||||
err := tt.Tx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -453,11 +466,16 @@ func (tt *TableTx) Rollback() {
|
|||
}
|
||||
|
||||
func (pg *PostGIS) NewTableTx(spec *TableSpec) *TableTx {
|
||||
return &TableTx{
|
||||
tt := &TableTx{
|
||||
Pg: pg,
|
||||
Table: spec.Name,
|
||||
Spec: spec,
|
||||
wg: &sync.WaitGroup{},
|
||||
rows: make(chan []interface{}, 128),
|
||||
}
|
||||
tt.wg.Add(1)
|
||||
go tt.loop()
|
||||
return tt
|
||||
}
|
||||
|
||||
func New(conf database.Config, m *mapping.Mapping) (database.DB, error) {
|
||||
|
|
Loading…
Reference in New Issue