imposm3/writer/buffer.go

67 lines
1.2 KiB
Go

package writer
import (
"sync"
)
const bufferSize = 1024 * 8
type InsertBatch struct {
Table string
Rows [][]interface{}
}
type InsertElement struct {
Table string
Row []interface{}
}
type InsertBuffer struct {
In chan InsertElement
Out chan InsertBatch
wg *sync.WaitGroup
}
func NewInsertBuffer() *InsertBuffer {
ib := InsertBuffer{
In: make(chan InsertElement, 64),
Out: make(chan InsertBatch, 1),
wg: &sync.WaitGroup{},
}
ib.wg.Add(1)
go ib.loop()
return &ib
}
func (ib *InsertBuffer) Close() {
close(ib.In)
ib.wg.Wait()
close(ib.Out)
}
func (ib *InsertBuffer) Insert(table string, row []interface{}) {
ib.In <- InsertElement{table, row}
}
func (ib *InsertBuffer) loop() {
buffer := make(map[string]*InsertBatch)
for elem := range ib.In {
if batch, ok := buffer[elem.Table]; ok {
batch.Rows = append(batch.Rows, elem.Row)
} else {
buffer[elem.Table] = &InsertBatch{elem.Table, [][]interface{}{elem.Row}}
}
if len(buffer[elem.Table].Rows) > bufferSize {
batch := buffer[elem.Table]
delete(buffer, elem.Table)
ib.Out <- *batch
}
}
for table, batch := range buffer {
delete(buffer, table)
ib.Out <- *batch
}
ib.wg.Done()
}