ETL Pipelines

Built-in pipeline runtime for Extract-Transform-Load workflows. Define packages in JSON, validate them, serialize them, execute them in batches, and inspect run metrics.

Overview

The pipeline system connects sources to destinations through a chain of transforms. Pipelines are defined as JSON package files and executed by the PipelineOrchestrator. The built-in runtime supports CSV and JSON file connectors.

Pipeline Flow

Source (CSV, JSON)
Transform Chain
Destination (CSV, JSON)
Run History & Diagnostics

Sources

SourceDescription
CsvFileBuilt-in file source with configurable header handling
JsonFileBuilt-in file source for JSON documents and arrays

Transforms

TransformDescription
SelectPass through only the named columns
RenameRename columns with explicit source/target mappings
CastConvert columns to CSharpDB primitive types
FilterKeep rows that satisfy the filter expression
DeriveAdd new columns from simple expressions or literals
DeduplicateRemove duplicate rows using one or more key columns

Destinations

DestinationDescription
CsvFileBuilt-in file destination for CSV output
JsonFileBuilt-in file destination for JSON output

Pipeline Package Definition

Pipelines are defined as JSON packages that describe the source, transforms, and destination.

{
  "name": "customers-csv-to-json",
  "version": "1.0.0",
  "source": {
    "kind": "csvFile",
    "path": "data/customers.csv",
    "hasHeaderRow": true
  },
  "transforms": [
    { "kind": "select", "selectColumns": ["id", "name", "status"] },
    { "kind": "rename", "renameMappings": [{ "source": "name", "target": "full_name" }] },
    { "kind": "cast", "castMappings": [{ "column": "id", "targetType": "integer" }] },
    { "kind": "filter", "filterExpression": "status == 'active'" },
    { "kind": "derive", "derivedColumns": [{ "name": "import_source", "expression": "'csv'" }] },
    { "kind": "deduplicate", "deduplicateKeys": ["id"] }
  ],
  "destination": {
    "kind": "jsonFile",
    "path": "data/customers.cleaned.json"
  },
  "options": {
    "batchSize": 500,
    "errorMode": "failFast",
    "checkpointInterval": 1
  }
}

Execution Modes

ModeDescription
ValidateCheck the pipeline definition for errors without executing
DryRunOpen the source and run transforms, but skip destination writes
RunExecute the full pipeline and persist checkpoints
ResumeResume a failed pipeline from the last checkpoint

Management

Pipelines can be managed through multiple interfaces:

  • Admin UI — Visual pipeline designer with drag-and-drop nodes, connection lines, and properties panel
  • REST API — Pipeline CRUD and execution endpoints
  • Client SDKCSharpDbPipelineRunner and CSharpDbPipelineStorage classes
  • CLI — Command-line pipeline execution and inspection

Run History

Every pipeline execution is tracked with metadata including start/end times, row counts, error details, and reject records. Failed rows are captured for inspection and recovery.