CSV Bulk Import Tutorial
Use the public engine API to stream a fixed-schema CSV file into a SQL table with batched inserts, explicit transactions, and secondary indexes created after the load.
BULK INSERT / COPY surface. The recommended relational bulk-ingest path today is Database.OpenAsync(...) + PrepareInsertBatch(...) + explicit transaction batching.
When to use this pattern
This approach is a good fit when you already know the target schema, want deterministic row conversion logic, and care more about throughput than about exposing a generic import surface. It keeps the logic on top of the existing public API instead of introducing a new product feature.
- Use it for fixed-schema CSV feeds, benchmark seeding, sample data, and one-off operational imports.
- Use
PrepareInsertBatch(...)to bypass repeated SQL parsing and reduce per-row allocation churn. - Use explicit transaction batching so one durable commit covers many rows instead of one row.
- Create secondary indexes after the load when you control the schema and want the faster ingest path.
Run the sample
The runnable example lives in samples/csv-bulk-import.
dotnet run --project samples/csv-bulk-import/CsvBulkImportSample.csproj
Optional flags let you override the input CSV, output database path, and batch size:
dotnet run --project samples/csv-bulk-import/CsvBulkImportSample.csproj -- \
--csv-path samples/csv-bulk-import/events.csv \
--database-path artifacts/samples/csv-import-demo.db \
--batch-size 1000
Create the schema first
The sample creates the table in code before it reads any CSV rows. That keeps the tutorial focused on ingest mechanics rather than generic schema inference.
var dbOptions = new DatabaseOptions()
.ConfigureStorageEngine(builder => builder.UseWriteOptimizedPreset());
await using var db = await Database.OpenAsync(databasePath, dbOptions);
await db.ExecuteAsync("""
CREATE TABLE events (
id INTEGER PRIMARY KEY,
timestamp_utc TEXT NOT NULL,
source TEXT NOT NULL,
severity TEXT NOT NULL,
category TEXT NOT NULL,
payload_size INTEGER NOT NULL,
ingest_ms REAL NOT NULL
)
""");
UseWriteOptimizedPreset() is the recommended starting point for durable file-backed ingest. The main throughput win still comes from batching and transaction shape, not from trying to tune advanced knobs first.
Stream and validate the CSV
The sample reads the file line-by-line, validates the header case-insensitively, and fails fast if a row cannot be converted into the expected schema. This keeps bad data visible and ties each parsing failure back to a concrete line number.
string? headerLine = await reader.ReadLineAsync();
if (headerLine is null)
throw new FormatException("CSV file is empty.");
ValidateHeader(headerLine);
DbValue[] row = ConvertRow(fields, lineNumber);
batch.AddRow(row);
In the sample, each row becomes a fixed DbValue[] containing an integer primary key, ISO-8601 timestamp text, categorical text fields, an integer payload size, and a real-valued ingest latency.
Batch rows into explicit transactions
This is the core of the sample. Rows are buffered into one reusable InsertBatch, and each flush is wrapped in an explicit transaction.
var batch = db.PrepareInsertBatch("events", batchSize);
while (await reader.ReadLineAsync() is { } line)
{
lineNumber++;
if (string.IsNullOrWhiteSpace(line))
continue;
string[] fields = ParseCsvLine(line, lineNumber);
batch.AddRow(ConvertRow(fields, lineNumber));
if (batch.Count >= batchSize)
importedRows += await FlushBatchAsync(db, batch);
}
await db.BeginTransactionAsync();
try
{
int rows = await batch.ExecuteAsync();
await db.CommitAsync();
return rows;
}
catch
{
await db.RollbackAsync();
throw;
}
This pattern reduces the number of durable commits dramatically compared with issuing one auto-commit INSERT per row. It is the current public equivalent of a bulk-load path for relational tables.
Build indexes after the load
Secondary indexes are created only after all CSV rows are on disk. That avoids maintaining them on every insert while the table is still being filled.
await db.ExecuteAsync("CREATE INDEX idx_events_timestamp_utc ON events (timestamp_utc)");
await db.ExecuteAsync("CREATE INDEX idx_events_severity ON events (severity)");
await db.ExecuteAsync("CREATE INDEX idx_events_source ON events (source)");
If you need a reusable operational import tool later, this is the first behavior to preserve: fixed conversion rules up front, batched row writes in the middle, and index creation after the data load.
Inspect the result with the CLI
The sample prints the database path after the import completes. Use the CLI to inspect the rows and confirm the indexes are paying off for the queries you care about.
dotnet run --project src/CSharpDB.Cli -- artifacts/samples/csv-import-demo.db
SELECT COUNT(*) FROM events;
SELECT severity, COUNT(*) FROM events GROUP BY severity ORDER BY severity;
SELECT source, AVG(ingest_ms) FROM events GROUP BY source ORDER BY source;
SELECT * FROM events ORDER BY timestamp_utc LIMIT 5;