remove unused InsertBuffer
parent
5c59fe8a78
commit
d387f3cd24
|
@ -1,66 +0,0 @@
|
|||
package writer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
const bufferSize = 1024 * 8
|
||||
|
||||
type InsertBatch struct {
|
||||
Table string
|
||||
Rows [][]interface{}
|
||||
}
|
||||
|
||||
type InsertElement struct {
|
||||
Table string
|
||||
Row []interface{}
|
||||
}
|
||||
|
||||
type InsertBuffer struct {
|
||||
In chan InsertElement
|
||||
Out chan InsertBatch
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewInsertBuffer() *InsertBuffer {
|
||||
ib := InsertBuffer{
|
||||
In: make(chan InsertElement, 64),
|
||||
Out: make(chan InsertBatch, 1),
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
ib.wg.Add(1)
|
||||
go ib.loop()
|
||||
return &ib
|
||||
}
|
||||
|
||||
func (ib *InsertBuffer) Close() {
|
||||
close(ib.In)
|
||||
ib.wg.Wait()
|
||||
close(ib.Out)
|
||||
}
|
||||
|
||||
func (ib *InsertBuffer) Insert(table string, row []interface{}) {
|
||||
ib.In <- InsertElement{table, row}
|
||||
}
|
||||
|
||||
func (ib *InsertBuffer) loop() {
|
||||
buffer := make(map[string]*InsertBatch)
|
||||
|
||||
for elem := range ib.In {
|
||||
if batch, ok := buffer[elem.Table]; ok {
|
||||
batch.Rows = append(batch.Rows, elem.Row)
|
||||
} else {
|
||||
buffer[elem.Table] = &InsertBatch{elem.Table, [][]interface{}{elem.Row}}
|
||||
}
|
||||
if len(buffer[elem.Table].Rows) > bufferSize {
|
||||
batch := buffer[elem.Table]
|
||||
delete(buffer, elem.Table)
|
||||
ib.Out <- *batch
|
||||
}
|
||||
}
|
||||
for table, batch := range buffer {
|
||||
delete(buffer, table)
|
||||
ib.Out <- *batch
|
||||
}
|
||||
ib.wg.Done()
|
||||
}
|
Loading…
Reference in New Issue