From d387f3cd246fa1ff35f356ca771f1163b94cb24f Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Tue, 29 Oct 2013 15:07:26 +0100 Subject: [PATCH] remove unused InsertBuffer --- writer/buffer.go | 66 ------------------------------------------------ 1 file changed, 66 deletions(-) delete mode 100644 writer/buffer.go diff --git a/writer/buffer.go b/writer/buffer.go deleted file mode 100644 index 211ea06..0000000 --- a/writer/buffer.go +++ /dev/null @@ -1,66 +0,0 @@ -package writer - -import ( - "sync" -) - -const bufferSize = 1024 * 8 - -type InsertBatch struct { - Table string - Rows [][]interface{} -} - -type InsertElement struct { - Table string - Row []interface{} -} - -type InsertBuffer struct { - In chan InsertElement - Out chan InsertBatch - wg *sync.WaitGroup -} - -func NewInsertBuffer() *InsertBuffer { - ib := InsertBuffer{ - In: make(chan InsertElement, 64), - Out: make(chan InsertBatch, 1), - wg: &sync.WaitGroup{}, - } - ib.wg.Add(1) - go ib.loop() - return &ib -} - -func (ib *InsertBuffer) Close() { - close(ib.In) - ib.wg.Wait() - close(ib.Out) -} - -func (ib *InsertBuffer) Insert(table string, row []interface{}) { - ib.In <- InsertElement{table, row} -} - -func (ib *InsertBuffer) loop() { - buffer := make(map[string]*InsertBatch) - - for elem := range ib.In { - if batch, ok := buffer[elem.Table]; ok { - batch.Rows = append(batch.Rows, elem.Row) - } else { - buffer[elem.Table] = &InsertBatch{elem.Table, [][]interface{}{elem.Row}} - } - if len(buffer[elem.Table].Rows) > bufferSize { - batch := buffer[elem.Table] - delete(buffer, elem.Table) - ib.Out <- *batch - } - } - for table, batch := range buffer { - delete(buffer, table) - ib.Out <- *batch - } - ib.wg.Done() -}