High-performance ETL pipeline library for .NET that actually gets out of your way.
DataFlow is a streaming data pipeline library designed for processing large datasets without blowing up your memory. Think LINQ but for ETL operations - read data from anywhere, transform it, and write it somewhere else. No XML configs, no enterprise architect nonsense. Just simple, chainable operations that work.
dotnet add package DataFlow.CoreOr grab it from NuGet if you're using Visual Studio.
Process a CSV file in three lines:
DataFlow.From.Csv("input.csv")
.Filter(row => row["Status"] == "Active")
.WriteToCsv("output.csv");That's the entire API philosophy. Read, transform, write.
// CSV files
var pipeline = DataFlow.From.Csv("data.csv");
// JSON files
var pipeline = DataFlow.From.Json("data.json");
// Excel spreadsheets
var pipeline = DataFlow.From.Excel("report.xlsx", sheet: "Sales");
// SQL databases
var pipeline = DataFlow.From.Sql(connectionString)
.Query("SELECT * FROM Orders WHERE Created > @date", new { date = DateTime.Today });
// In-memory collections
var pipeline = DataFlow.From.Collection(myList);Chain operations like you would with LINQ:
pipeline
.Filter(row => row.GetValue<decimal>("Price") > 0)
.Map(row => new {
Product = row["Name"],
Revenue = row.GetValue<decimal>("Price") * row.GetValue<int>("Quantity"),
Category = row["Category"]
})
.GroupBy(x => x.Category)
.Select(group => new {
Category = group.Key,
TotalRevenue = group.Sum(x => x.Revenue),
ProductCount = group.Count()
})
.OrderByDescending(x => x.TotalRevenue)
.Take(10);// To CSV
pipeline.WriteToCsv("output.csv");
// To JSON
pipeline.WriteToJson("output.json");
// To Excel
pipeline.WriteToExcel("report.xlsx", "Results");
// To SQL
pipeline.WriteToSql(connectionString, "TargetTable");
// To collection
var results = pipeline.ToList();
var array = pipeline.ToArray();Export active customers with their order totals:
DataFlow.From.Sql(connectionString)
.Query(@"
SELECT c.*, COUNT(o.Id) as OrderCount, SUM(o.Total) as TotalSpent
FROM Customers c
LEFT JOIN Orders o ON c.Id = o.CustomerId
WHERE c.IsActive = 1
GROUP BY c.Id")
.Map(row => new {
CustomerId = row["Id"],
Name = row["Name"],
Email = row["Email"],
OrderCount = row["OrderCount"],
TotalSpent = row["TotalSpent"],
CustomerValue = row.GetValue<int>("OrderCount") > 10 ? "High" : "Normal"
})
.OrderByDescending(x => x.TotalSpent)
.WriteToCsv("customer_report.csv");Clean messy CSV data:
DataFlow.From.Csv("raw_data.csv")
.RemoveDuplicates("Id")
.FillMissing("Email", "[email protected]")
.FillMissing("Country", "USA")
.Map(row => {
row["Email"] = row["Email"].ToString().ToLower().Trim();
row["Phone"] = Regex.Replace(row["Phone"].ToString(), @"[^\d]", "");
return row;
})
.Filter(row => IsValidEmail(row["Email"].ToString()))
.WriteToCsv("cleaned_data.csv");Speed up CPU-intensive operations:
DataFlow.From.Csv("large_dataset.csv")
.AsParallel(maxDegreeOfParallelism: Environment.ProcessorCount)
.Map(row => {
// Some expensive operation
row["Hash"] = ComputeExpensiveHash(row["Data"]);
row["Processed"] = true;
return row;
})
.WriteToCsv("processed.csv");Validate and handle errors:
var validator = new DataValidator()
.Required("Id", "Name", "Email")
.Email("Email")
.Range("Age", min: 0, max: 150)
.Regex("Phone", @"^\d{10}$")
.Custom("StartDate", value => DateTime.Parse(value) <= DateTime.Now);
DataFlow.From.Csv("users.csv")
.Validate(validator)
.OnInvalid(ErrorStrategy.LogAndSkip) // or ThrowException, Fix, Collect
.WriteToCsv("valid_users.csv");
// Access validation errors
var errors = pipeline.ValidationErrors;Process multi-gigabyte files with constant memory usage:
DataFlow.From.Csv("10gb_log_file.csv")
.Filter(row => row["Level"] == "ERROR")
.Select(row => new {
Timestamp = row["Timestamp"],
Message = row["Message"],
Source = row["Source"]
})
.WriteToCsv("errors_only.csv"); // Streams directly, uses ~50MB RAMImplement IDataSource for custom sources:
public class MongoDataSource : IDataSource
{
public IEnumerable<DataRow> Read()
{
// Your MongoDB reading logic
foreach (var doc in collection.Find(filter))
{
yield return new DataRow(doc.ToDictionary());
}
}
}
// Use it
var pipeline = DataFlow.From.Custom(new MongoDataSource());Create reusable transformations:
public static class MyTransformations
{
public static IPipeline<T> NormalizePhoneNumbers<T>(this IPipeline<T> pipeline)
{
return pipeline.Map(row => {
if (row.ContainsKey("Phone"))
{
row["Phone"] = NormalizePhone(row["Phone"].ToString());
}
return row;
});
}
}
// Use it
pipeline.NormalizePhoneNumbers().WriteToCsv("output.csv");Monitor long-running operations:
var progress = new Progress<int>(percent =>
Console.WriteLine($"Processing: {percent}%"));
DataFlow.From.Csv("large_file.csv")
.WithProgress(progress)
.Filter(row => ComplexFilter(row))
.WriteToCsv("filtered.csv");Benchmarks on 1M records (Intel i7, 16GB RAM):
| Operation | Memory Usage | Time | Records/sec |
|---|---|---|---|
| CSV Read + Filter + Write | 42 MB | 3.2s | 312,500 |
| JSON Parse + Transform | 156 MB | 5.1s | 196,078 |
| SQL Read + Group + Export | 89 MB | 4.7s | 212,765 |
| Parallel Transform (8 cores) | 203 MB | 1.4s | 714,285 |
Memory usage stays constant regardless of file size when streaming.
Global settings via DataFlowConfig:
DataFlowConfig.Configure(config => {
config.DefaultCsvDelimiter = ';';
config.DefaultDateFormat = "yyyy-MM-dd";
config.BufferSize = 8192;
config.EnableAutoTypeConversion = true;
config.ThrowOnMissingColumns = false;
});MIT - Do whatever you want with it.
Built because I got tired of writing the same ETL code over and over.