Skip to main content

Integration Testing

This section assumes that the Flowtide stream is implemented as a webapplication with minimal API.

Given a stream that is configured in a similar matter to this:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddFlowtideStream("stream")
.AddSqlFileAsPlan("stream.sql")
.AddConnectors(c =>
{
// Add empty sql server sinks that will be overridden by the tests
c.AddSqlServerSource(() => "");
c.AddSqlServerSink(() => "");
})
.AddStorage(storage =>
{
storage.AddFasterKVAzureStorage("", "", "");
});

var app = builder.Build();

app.Run();

public partial class Program { }

Create an XUnit test project and install the following nuget packages:

  • FlowtideDotNet.TestFramework
  • Microsoft.AspNetCore.Mvc.Testing

Configure WebApplicationFactory

Create a class for your integration tests and configure a WebApplicationFactory.

With the WebApplicationFactory it is possible to override connectors and storage to allow using different connectors for the test.

public class IntegrationTests : IDisposable
{
private readonly WebApplicationFactory<Program> _factory;
private readonly StreamTestMonitor _inProcessMonitor;

public IntegrationTests()
{
_factory = new WebApplicationFactory<Program>().WithWebHostBuilder(b =>
{
b.ConfigureTestServices(services =>
{
services.AddFlowtideStream("stream")
// Add to override connectors
.AddConnectors(c =>
{
// Add new connectors here
})
.AddStorage(storage =>
{
// Change to temporary storage for testing
storage.AddTemporaryDevelopmentStorage();
})
// Add a test monitor that can be used to check for checkpoints (which ensures data updates)
.AddStreamTestMonitor(_inProcessMonitor);
});
});
}

public void Dispose()
{
_factory.Dispose();
}
}

Add a test table as a source

It is possible to create a test table where you can create mock data used for the tests.

Example:

public class IntegrationTests : IDisposable
{
private TestDataTable _source;
...

public IntegrationTests()
{
_source = TestDataTable.Create(new[]
{
new { val = 0 },
new { val = 1 },
new { val = 2 },
new { val = 3 },
new { val = 4 }
});

_factory = new WebApplicationFactory<Program>().WithWebHostBuilder(b =>
{
b.ConfigureTestServices(services =>
{
services.AddFlowtideStream("stream")
// Add to override connectors
.AddConnectors(c =>
{
c.AddTestDataTable("testtable", _source);
})
...
});
});
}
}

A test data table can be added under AddConnectors with the specific table name it should be registered under.

Add a test data sink

It may be useful to add a test data sink which allows evaluating the output of the stream. This can be used to evaluate that the query plan actually results with the expected data.

public class IntegrationTests : IDisposable
{
private TestDataSink _sink;
...

public IntegrationTests()
{
_sink = new TestDataSink();

_factory = new WebApplicationFactory<Program>().WithWebHostBuilder(b =>
{
b.ConfigureTestServices(services =>
{
services.AddFlowtideStream("stream")
// Add to override connectors
.AddConnectors(c =>
{
...
// Test data sink is added using a regexp expression that matches destination names
c.AddTestDataSink(".*", _sink);
})
...
});
});
}
}

Creating a test

The next step is to create the actual test case to test the stream.

[Fact]
public async Task TestStreamOutput()
{
_factory.CreateClient(); //Create a client to start the stream

// Wait for the stream to checkpoint before asserting the resulting data
await _inProcessMonitor.WaitForCheckpoint();

Assert.True(_sink.IsCurrentDataEqual(new[]
{
new { val = 0 },
new { val = 1 },
new { val = 2 },
new { val = 3 },
new { val = 4 }
}));
}

Full example

Here is the full example of the test class:

public class IntegrationTests : IDisposable
{
private readonly WebApplicationFactory<Program> _factory;
private TestDataSink _sink;
private TestDataTable _source;
private StreamTestMonitor _inProcessMonitor;

public IntegrationTests()
{
_source = TestDataTable.Create(new[]
{
new { val = 0 },
new { val = 1 },
new { val = 2 },
new { val = 3 },
new { val = 4 }
});

_sink = new TestDataSink();
_inProcessMonitor = new StreamTestMonitor();
_factory = new WebApplicationFactory<Program>().WithWebHostBuilder(b =>
{
b.ConfigureTestServices(services =>
{
services.AddFlowtideStream("stream")
.AddConnectors(c =>
{
// Override connectors
c.AddTestDataTable("testtable", _source);
c.AddTestDataSink(".*", _sink);
})
.AddStorage(storage =>
{
// Change to temporary storage for unit tests
storage.AddTemporaryDevelopmentStorage();
})
.AddStreamTestMonitor(_inProcessMonitor);
});
});
}

public void Dispose()
{
_factory.Dispose();
}

[Fact]
public async Task TestStreamOutput()
{
_factory.CreateClient(); //Create a client to start the stream

await _inProcessMonitor.WaitForCheckpoint();

Assert.True(_sink.IsCurrentDataEqual(new[]
{
new { val = 0 },
new { val = 1 },
new { val = 2 },
new { val = 3 },
new { val = 4 }
}));

// Add a new row
_source.AddRows(new { val = 5 });

// Remove a row
_source.RemoveRows(new { val = 3 });

await _inProcessMonitor.WaitForCheckpoint();

Assert.True(_sink.IsCurrentDataEqual(new[]
{
new { val = 0 },
new { val = 1 },
new { val = 2 },
new { val = 4 },
new { val = 5 }
}));
}
}