Skip to main content

OpenLineage

Flowtide has built-in support for reporting data lineage events to an OpenLineage-compatible endpoint over HTTP. When enabled, the reporter automatically sends lineage events as the stream transitions between states (starting, running, completed, or failed).

Setup with Dependency Injection

If you have added your stream using AddFlowtideStream, you can enable OpenLineage HTTP reporting with the AddOpenLineageHttp extension method.

Install the following NuGet packages:

  • FlowtideDotNet.DependencyInjection

Add the following code to your Program.cs:

builder.Services.AddFlowtideStream("mystream")
.AddOpenLineageHttp(opt =>
{
opt.Url = "http://localhost:5000/api/v1/lineage";
});

Setup with FlowtideBuilder

If you are using FlowtideBuilder directly, call WithOpenLineageHttp:

var builder = new FlowtideBuilder("mystream")
.AddPlan(plan)
.WithOpenLineageHttp(new OpenLineageHttpOptions
{
Url = "http://localhost:5000/api/v1/lineage"
});

Configuration Options

The following options are available on OpenLineageHttpOptions:

OptionTypeRequiredDescription
Urlstring?YesThe URL of the OpenLineage HTTP endpoint to send lineage events to.
IncludeSchemaboolNoWhether to include schema information in the lineage events. Defaults to false.
RunIdGuid?NoA custom run identifier. If not set, a new Guid is generated automatically for each stream.
OnRequestAction<HttpRequestMessage>?NoA callback invoked on each outgoing HTTP request before it is sent.

Adding Authentication Headers

Use the OnRequest callback to modify outgoing HTTP requests, for example to add an authorization header:

builder.Services.AddFlowtideStream("mystream")
.AddOpenLineageHttp(opt =>
{
opt.Url = "http://localhost:5000/api/v1/lineage";
opt.OnRequest = message =>
{
message.Headers.Authorization =
new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", "my-token");
};
});

Including Schema Information

Set IncludeSchema to true to include dataset schema (column-level) information in the lineage events:

builder.Services.AddFlowtideStream("mystream")
.AddOpenLineageHttp(opt =>
{
opt.Url = "http://localhost:5000/api/v1/lineage";
opt.IncludeSchema = true;
});

Reported Events

The reporter listens to stream state changes and sends OpenLineage events accordingly:

Stream StateOpenLineage Event TypeDescription
StartingSTARTThe stream is starting up.
RunningRUNNINGThe stream is running normally.
StoppedCOMPLETEThe stream has been stopped after a stopping request.
FailureFAILThe stream has encountered an error.