67 lines
1.2 KiB
Go
67 lines
1.2 KiB
Go
package writer
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
const bufferSize = 1024 * 8
|
|
|
|
type InsertBatch struct {
|
|
Table string
|
|
Rows [][]interface{}
|
|
}
|
|
|
|
type InsertElement struct {
|
|
Table string
|
|
Row []interface{}
|
|
}
|
|
|
|
type InsertBuffer struct {
|
|
In chan InsertElement
|
|
Out chan InsertBatch
|
|
wg *sync.WaitGroup
|
|
}
|
|
|
|
func NewInsertBuffer() *InsertBuffer {
|
|
ib := InsertBuffer{
|
|
In: make(chan InsertElement, 64),
|
|
Out: make(chan InsertBatch, 1),
|
|
wg: &sync.WaitGroup{},
|
|
}
|
|
ib.wg.Add(1)
|
|
go ib.loop()
|
|
return &ib
|
|
}
|
|
|
|
func (ib *InsertBuffer) Close() {
|
|
close(ib.In)
|
|
ib.wg.Wait()
|
|
close(ib.Out)
|
|
}
|
|
|
|
func (ib *InsertBuffer) Insert(table string, row []interface{}) {
|
|
ib.In <- InsertElement{table, row}
|
|
}
|
|
|
|
func (ib *InsertBuffer) loop() {
|
|
buffer := make(map[string]*InsertBatch)
|
|
|
|
for elem := range ib.In {
|
|
if batch, ok := buffer[elem.Table]; ok {
|
|
batch.Rows = append(batch.Rows, elem.Row)
|
|
} else {
|
|
buffer[elem.Table] = &InsertBatch{elem.Table, [][]interface{}{elem.Row}}
|
|
}
|
|
if len(buffer[elem.Table].Rows) > bufferSize {
|
|
batch := buffer[elem.Table]
|
|
delete(buffer, elem.Table)
|
|
ib.Out <- *batch
|
|
}
|
|
}
|
|
for table, batch := range buffer {
|
|
delete(buffer, table)
|
|
ib.Out <- *batch
|
|
}
|
|
ib.wg.Done()
|
|
}
|