From 284ca3b29263a8b57e10096d350c134cf21853fb Mon Sep 17 00:00:00 2001 From: Zaman Gabdrakhmanov Date: Tue, 19 Aug 2025 15:25:22 +0300 Subject: [PATCH 1/5] add: output unit test with bug --- .../output/general/test/unit_test.go | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/internal/generator/output/general/test/unit_test.go b/internal/generator/output/general/test/unit_test.go index 4e7a91f..2b789c8 100644 --- a/internal/generator/output/general/test/unit_test.go +++ b/internal/generator/output/general/test/unit_test.go @@ -14,6 +14,9 @@ import ( "github.com/tarantool/sdvg/internal/generator/common" "github.com/tarantool/sdvg/internal/generator/models" outputGeneral "github.com/tarantool/sdvg/internal/generator/output/general" + "github.com/tarantool/sdvg/internal/generator/output/general/writer" + outputCsv "github.com/tarantool/sdvg/internal/generator/output/general/writer/csv" + outputParquet "github.com/tarantool/sdvg/internal/generator/output/general/writer/parquet" "github.com/tarantool/sdvg/internal/generator/usecase" useCaseGeneral "github.com/tarantool/sdvg/internal/generator/usecase/general" ) @@ -264,6 +267,58 @@ cause: dir for model is not empty } } +// TestWriterInitTeardown tests if Teardown works properly right after Init +func TestWriterInitTeardown(t *testing.T) { + tmpDir := t.TempDir() + + testCases := []struct { + name string + writer writer.Writer + }{ + { + "csv", + outputCsv.NewWriter( + context.TODO(), + nil, + //w.config.CSVParams, + &models.CSVConfig{ + FloatPrecision: 1, + DatetimeFormat: "2006-01-02T15:04:05Z07:00", + Delimiter: ",", + WithoutHeaders: false, + }, + nil, + tmpDir, + false, + make(chan<- uint64), + ), + }, + { + "parquet", + outputParquet.NewWriter( + nil, + &models.ParquetConfig{ + "UNCOMPRESSED", + 2, + models.ParquetDateTimeMillisFormat, + }, + nil, + outputParquet.NewFileSystem(), + tmpDir, + false, + make(chan<- uint64), + ), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, tc.writer.Init()) + require.NoError(t, tc.writer.Teardown()) + }) + } +} + //nolint:lll func generate(t *testing.T, cfg *models.GenerationConfig, uc usecase.UseCase, continueGeneration, forceGeneration bool) error { t.Helper() From b22a1cc7649e9fc9b5e437b74e7be3cc5ebeb1fe Mon Sep 17 00:00:00 2001 From: Zaman Gabdrakhmanov Date: Tue, 19 Aug 2025 15:33:32 +0300 Subject: [PATCH 2/5] fix bug in csv.go --- .../output/general/writer/csv/csv.go | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/internal/generator/output/general/writer/csv/csv.go b/internal/generator/output/general/writer/csv/csv.go index 066e300..af58028 100644 --- a/internal/generator/output/general/writer/csv/csv.go +++ b/internal/generator/output/general/writer/csv/csv.go @@ -41,6 +41,8 @@ type Writer struct { fileDescriptor *os.File csvWriter *stdCSV.Writer flushTicker *time.Ticker + flushWg *sync.WaitGroup + flushStopChan chan struct{} totalWrittenRows uint64 bufferedRows uint64 @@ -51,7 +53,6 @@ type Writer struct { writerWg *sync.WaitGroup writerMutex *sync.Mutex started bool - stopChan chan struct{} } // NewWriter function creates Writer object. @@ -72,13 +73,14 @@ func NewWriter( outputPath: outputPath, continueGeneration: continueGeneration, flushTicker: time.NewTicker(flushInterval), + flushWg: &sync.WaitGroup{}, + flushStopChan: make(chan struct{}), writtenRowsChan: writtenRowsChan, writerChan: make(chan *models.DataRow), errorsChan: make(chan error, 1), writerWg: &sync.WaitGroup{}, writerMutex: &sync.Mutex{}, started: false, - stopChan: make(chan struct{}), } } @@ -104,6 +106,7 @@ func (w *Writer) Init() error { w.started = true w.writerWg.Add(1) + w.flushWg.Add(1) go w.writer() go w.flusher() @@ -142,9 +145,11 @@ func (w *Writer) writer() { } func (w *Writer) flusher() { + defer w.flushWg.Done() + for { select { - case <-w.stopChan: + case <-w.flushStopChan: return case <-w.flushTicker.C: if w.csvWriter != nil { @@ -427,14 +432,21 @@ func (w *Writer) Teardown() error { w.writerWg.Wait() w.flushTicker.Stop() - w.stopChan <- struct{}{} + close(w.flushStopChan) + w.flushWg.Wait() - if err := w.flush(); err != nil { - return err + w.writerMutex.Lock() + defer w.writerMutex.Unlock() + if w.csvWriter != nil { + if err := w.flush(); err != nil { + return err + } } - if err := w.fileDescriptor.Close(); err != nil { - return errors.New(err.Error()) + if w.fileDescriptor != nil { + if err := w.fileDescriptor.Close(); err != nil { + return errors.New(err.Error()) + } } select { From b74d499a16e6091ce8a90cd9ef75ebd2e039dca2 Mon Sep 17 00:00:00 2001 From: Zaman Gabdrakhmanov Date: Tue, 19 Aug 2025 15:44:46 +0300 Subject: [PATCH 3/5] upd: csv and parquet with test of bug --- .../output/general/test/unit_test.go | 2 +- .../output/general/writer/csv/csv.go | 3 +- .../output/general/writer/parquet/parquet.go | 37 +++++++++++++------ 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/internal/generator/output/general/test/unit_test.go b/internal/generator/output/general/test/unit_test.go index 2b789c8..5b1869b 100644 --- a/internal/generator/output/general/test/unit_test.go +++ b/internal/generator/output/general/test/unit_test.go @@ -296,7 +296,7 @@ func TestWriterInitTeardown(t *testing.T) { { "parquet", outputParquet.NewWriter( - nil, + &models.Model{Columns: make([]*models.Column, 0)}, &models.ParquetConfig{ "UNCOMPRESSED", 2, diff --git a/internal/generator/output/general/writer/csv/csv.go b/internal/generator/output/general/writer/csv/csv.go index af58028..7d76084 100644 --- a/internal/generator/output/general/writer/csv/csv.go +++ b/internal/generator/output/general/writer/csv/csv.go @@ -436,13 +436,14 @@ func (w *Writer) Teardown() error { w.flushWg.Wait() w.writerMutex.Lock() - defer w.writerMutex.Unlock() if w.csvWriter != nil { + w.writerMutex.Unlock() if err := w.flush(); err != nil { return err } } + w.writerMutex.TryLock() if w.fileDescriptor != nil { if err := w.fileDescriptor.Close(); err != nil { return errors.New(err.Error()) diff --git a/internal/generator/output/general/writer/parquet/parquet.go b/internal/generator/output/general/writer/parquet/parquet.go index da0f362..a84ba78 100644 --- a/internal/generator/output/general/writer/parquet/parquet.go +++ b/internal/generator/output/general/writer/parquet/parquet.go @@ -28,6 +28,9 @@ import ( const ( flushInterval = 5 * time.Second + //nolint:godox + // TODO: find optimal value, or calculate it to flush on disk 512Mb data + recordBuilderReserve = 5000 ) var ( @@ -68,7 +71,10 @@ type Writer struct { parquetWriter *pqarrow.FileWriter writerProperties *parquet.WriterProperties recordBuilder *array.RecordBuilder - flushTicker *time.Ticker + + flushTicker *time.Ticker + flushWg *sync.WaitGroup + flushStopChan chan struct{} totalWrittenRows uint64 bufferedRows uint64 @@ -77,7 +83,6 @@ type Writer struct { errorChan chan error writerMutex *sync.Mutex started bool - stopCh chan struct{} } type FileSystem interface { @@ -106,11 +111,12 @@ func NewWriter( continueGeneration: continueGeneration, fs: fs, flushTicker: time.NewTicker(flushInterval), + flushWg: &sync.WaitGroup{}, + flushStopChan: make(chan struct{}), writtenRowsChan: writtenRowsChan, errorChan: make(chan error), writerMutex: &sync.Mutex{}, started: false, - stopCh: make(chan struct{}), } } @@ -273,10 +279,9 @@ func (w *Writer) Init() error { w.parquetModelSchema = modelSchema w.writerProperties = parquet.NewWriterProperties(writerProperties...) + w.recordBuilder = array.NewRecordBuilder(memory.DefaultAllocator, w.parquetModelSchema) - //nolint:mnd,godox - // TODO: find optimal value, or calculate it to flush on disk 512Mb data - w.recordBuilder.Reserve(5000) + w.recordBuilder.Reserve(recordBuilderReserve) if err = os.MkdirAll(w.outputPath, os.ModePerm); err != nil { return errors.New(err.Error()) @@ -301,7 +306,7 @@ func (w *Writer) Init() error { func (w *Writer) flusher() { for { select { - case <-w.stopCh: + case <-w.flushStopChan: return case <-w.flushTicker.C: //nolint:godox @@ -661,14 +666,22 @@ func (w *Writer) WriteRow(row *models.DataRow) error { // Teardown function waits recording finish and stops parquet writer and closes opened file descriptor. func (w *Writer) Teardown() error { w.flushTicker.Stop() - w.stopCh <- struct{}{} + w.flushStopChan <- struct{}{} + w.flushWg.Wait() - if err := w.flush(); err != nil { - return errors.New(err.Error()) + w.writerMutex.Lock() + if w.recordBuilder != nil && w.parquetWriter != nil { + w.writerMutex.Unlock() + if err := w.flush(); err != nil { + return errors.New(err.Error()) + } } - if err := w.parquetWriter.Close(); err != nil { - return errors.New(err.Error()) + w.writerMutex.TryLock() + if w.parquetWriter != nil { + if err := w.parquetWriter.Close(); err != nil { + return errors.New(err.Error()) + } } select { From cc79610c992e55f4d7f6a9831eaf80ea11f1ea24 Mon Sep 17 00:00:00 2001 From: Zaman Gabdrakhmanov Date: Tue, 19 Aug 2025 15:52:17 +0300 Subject: [PATCH 4/5] fix linter --- internal/generator/output/general/test/unit_test.go | 9 ++++----- internal/generator/output/general/writer/csv/csv.go | 2 ++ .../generator/output/general/writer/parquet/parquet.go | 4 +++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/generator/output/general/test/unit_test.go b/internal/generator/output/general/test/unit_test.go index 5b1869b..c5a3bc2 100644 --- a/internal/generator/output/general/test/unit_test.go +++ b/internal/generator/output/general/test/unit_test.go @@ -267,7 +267,7 @@ cause: dir for model is not empty } } -// TestWriterInitTeardown tests if Teardown works properly right after Init +// TestWriterInitTeardown tests if Teardown works properly right after Init. func TestWriterInitTeardown(t *testing.T) { tmpDir := t.TempDir() @@ -280,7 +280,6 @@ func TestWriterInitTeardown(t *testing.T) { outputCsv.NewWriter( context.TODO(), nil, - //w.config.CSVParams, &models.CSVConfig{ FloatPrecision: 1, DatetimeFormat: "2006-01-02T15:04:05Z07:00", @@ -298,9 +297,9 @@ func TestWriterInitTeardown(t *testing.T) { outputParquet.NewWriter( &models.Model{Columns: make([]*models.Column, 0)}, &models.ParquetConfig{ - "UNCOMPRESSED", - 2, - models.ParquetDateTimeMillisFormat, + CompressionCodec: "UNCOMPRESSED", + FloatPrecision: 2, + DateTimeFormat: models.ParquetDateTimeMillisFormat, }, nil, outputParquet.NewFileSystem(), diff --git a/internal/generator/output/general/writer/csv/csv.go b/internal/generator/output/general/writer/csv/csv.go index 7d76084..357d3db 100644 --- a/internal/generator/output/general/writer/csv/csv.go +++ b/internal/generator/output/general/writer/csv/csv.go @@ -438,12 +438,14 @@ func (w *Writer) Teardown() error { w.writerMutex.Lock() if w.csvWriter != nil { w.writerMutex.Unlock() + if err := w.flush(); err != nil { return err } } w.writerMutex.TryLock() + if w.fileDescriptor != nil { if err := w.fileDescriptor.Close(); err != nil { return errors.New(err.Error()) diff --git a/internal/generator/output/general/writer/parquet/parquet.go b/internal/generator/output/general/writer/parquet/parquet.go index a84ba78..f1e2e8a 100644 --- a/internal/generator/output/general/writer/parquet/parquet.go +++ b/internal/generator/output/general/writer/parquet/parquet.go @@ -29,7 +29,7 @@ import ( const ( flushInterval = 5 * time.Second //nolint:godox - // TODO: find optimal value, or calculate it to flush on disk 512Mb data + // TODO: find optimal value, or calculate it to flush on disk 512Mb data. recordBuilderReserve = 5000 ) @@ -672,12 +672,14 @@ func (w *Writer) Teardown() error { w.writerMutex.Lock() if w.recordBuilder != nil && w.parquetWriter != nil { w.writerMutex.Unlock() + if err := w.flush(); err != nil { return errors.New(err.Error()) } } w.writerMutex.TryLock() + if w.parquetWriter != nil { if err := w.parquetWriter.Close(); err != nil { return errors.New(err.Error()) From 5edfaf8d23aa96a030a86a1368a80a47ee2739c5 Mon Sep 17 00:00:00 2001 From: Zaman Gabdrakhmanov Date: Wed, 20 Aug 2025 01:53:12 +0300 Subject: [PATCH 5/5] rm redundant writerMutex.Lock --- internal/generator/output/general/writer/csv/csv.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/generator/output/general/writer/csv/csv.go b/internal/generator/output/general/writer/csv/csv.go index 357d3db..fc10477 100644 --- a/internal/generator/output/general/writer/csv/csv.go +++ b/internal/generator/output/general/writer/csv/csv.go @@ -435,17 +435,12 @@ func (w *Writer) Teardown() error { close(w.flushStopChan) w.flushWg.Wait() - w.writerMutex.Lock() if w.csvWriter != nil { - w.writerMutex.Unlock() - if err := w.flush(); err != nil { return err } } - w.writerMutex.TryLock() - if w.fileDescriptor != nil { if err := w.fileDescriptor.Close(); err != nil { return errors.New(err.Error())