ETL Pipelines
Built-in pipeline runtime for Extract-Transform-Load workflows. Define packages in JSON, validate them, serialize them, execute them in batches, and inspect run metrics.
Overview
The pipeline system connects sources to destinations through a chain of transforms. Pipelines are defined as JSON package files and executed by the PipelineOrchestrator. The built-in runtime supports CSV and JSON file connectors.
Pipeline Flow
Source (CSV, JSON)
Transform Chain
Destination (CSV, JSON)
Run History & Diagnostics
Sources
| Source | Description |
|---|---|
CsvFile | Built-in file source with configurable header handling |
JsonFile | Built-in file source for JSON documents and arrays |
Transforms
| Transform | Description |
|---|---|
Select | Pass through only the named columns |
Rename | Rename columns with explicit source/target mappings |
Cast | Convert columns to CSharpDB primitive types |
Filter | Keep rows that satisfy the filter expression |
Derive | Add new columns from simple expressions or literals |
Deduplicate | Remove duplicate rows using one or more key columns |
Destinations
| Destination | Description |
|---|---|
CsvFile | Built-in file destination for CSV output |
JsonFile | Built-in file destination for JSON output |
Pipeline Package Definition
Pipelines are defined as JSON packages that describe the source, transforms, and destination.
{
"name": "customers-csv-to-json",
"version": "1.0.0",
"source": {
"kind": "csvFile",
"path": "data/customers.csv",
"hasHeaderRow": true
},
"transforms": [
{ "kind": "select", "selectColumns": ["id", "name", "status"] },
{ "kind": "rename", "renameMappings": [{ "source": "name", "target": "full_name" }] },
{ "kind": "cast", "castMappings": [{ "column": "id", "targetType": "integer" }] },
{ "kind": "filter", "filterExpression": "status == 'active'" },
{ "kind": "derive", "derivedColumns": [{ "name": "import_source", "expression": "'csv'" }] },
{ "kind": "deduplicate", "deduplicateKeys": ["id"] }
],
"destination": {
"kind": "jsonFile",
"path": "data/customers.cleaned.json"
},
"options": {
"batchSize": 500,
"errorMode": "failFast",
"checkpointInterval": 1
}
}
Execution Modes
| Mode | Description |
|---|---|
| Validate | Check the pipeline definition for errors without executing |
| DryRun | Open the source and run transforms, but skip destination writes |
| Run | Execute the full pipeline and persist checkpoints |
| Resume | Resume a failed pipeline from the last checkpoint |
Management
Pipelines can be managed through multiple interfaces:
- Admin UI — Visual pipeline designer with drag-and-drop nodes, connection lines, and properties panel
- REST API — Pipeline CRUD and execution endpoints
- Client SDK —
CSharpDbPipelineRunnerandCSharpDbPipelineStorageclasses - CLI — Command-line pipeline execution and inspection
Run History
Every pipeline execution is tracked with metadata including start/end times, row counts, error details, and reject records. Failed rows are captured for inspection and recovery.