SQL Server Connector
The SQL Server connector has three different parts:
- Source - Reads data from a SQL Server table.
- Sink - Writes results from a stream into a SQL Server table.
- Table Provider - Provides table information to the SQL plan builder.
Supported Data Types
The SQL Server connector supports reading and writing the following data types:
- Int
- BigInt
- Binary
- Bit
- Char
- Date
- Datetime
- Datetime2
- Decimal
- Float
- Image
- Money
- Nchar
- Ntext
- Numeric
- Nvarchar
- Real
- Smalldatetime
- Smallint
- Text
- Time
- Tinyint
- Uniqueidentifier
- Varbinary
- Varchar
- Xml
Source
The SQL Server Source allows Flowtide to fetch rows and upates from a SQL Server table.
It's strongly recommended that change tracking is enabled on the targeted tables. And must be enabled to allow near-realtime streaming.
The source uses the following logic to fetch data into the stream:
Reading with change tracking
To configure a source for a table with change tracking:
connectors.AddSqlServerSource(() => "connectionstring");
connectors.AddSqlServerSource(new SqlServerSourceOptions
{
ConnectionStringFunc = () => "",
});
By default changes are fetched once per second and can be modified using the DeltaLoadInterval
option.
Reading without change tracking
Flowtide can read data from sources that do not have change tracking enabled. For these sources data are all data is fetched on an interval, specified with the FullReloadInterval
option on the source.
Note that changes are not directly caught when change tracking is disabled.
Reading from views
To allow from sql server views the following options must be set:
connectors.AddSqlServerSource(new SqlServerSourceOptions
{
ConnectionStringFunc = () => "",
EnableFullReload = true,
FullReloadInterval = TimeSpan.FromHours(24),
});
This will read all data from the view on an interval specified. Delta loading data is not enabled when targeting a view.
Reading from tables
When targeting a table that does not have change tracking enabled an additional option must be provided AllowFullReloadOnTablesWithoutChangeTracking
.
connectors.AddSqlServerSource(new SqlServerSourceOptions
{
ConnectionStringFunc = () => "",
EnableFullReload = true,
FullReloadInterval = TimeSpan.FromHours(24),
AllowFullReloadOnTablesWithoutChangeTracking = true
});
This will read all data from the table on an interval specified in FullReloadInterval
. Delta loading data is not enabled when targeting a table without change tracking.
If the table supports change tracking, delta loading will still be used even if these options are provided. But a full load will occur on the provided interval.
Reading from large views or tables without change tracking
When targeting a large view or table it's possible to control the allowed size (number of rows) with the FullLoadMaxRowCount
option. This default value is 1 000 000 rows.
Retry strategy (reading from SQL Server)
By default the SQL Server source has a default retry strategy that will retry up to 10 times with increasing intervals, totaling a period of ~16 minutes. If no successful connection could be made during this period the stream will be restarted.
A custom pipeline can be specified on the source by setting the ResiliencePipeline
property.
connectors.AddSqlServerSource(new SqlServerSourceOptions
{
ResiliencePipeline = myPipeline
});
Flowtide uses polly
to handle retries, documentation and examples can be found here: Polly.
Sink
The SQL Server Sink implements the grouped write operator. This means that all rows are grouped by a primary key, thus all sink tables must have a primary key defined.
All SQL Server Sink tables must have a primary key defined. The primary key must also be in the query that fills the table.
Its implementation waits fully until the stream has reached a steady state at a time T until it writes data to the database. This means that its table output can always be traced back to a state from the source systems.
To use the SQL Server Sink add the following line to the ConnectorManager:
connectorManager.AddSqlServerSink(() => connectionString);
As with the SQL Server Source, the connection string is returned by a function to enable dynamic connection strings.
The sink inserts data into SQL Server by creating a temporary table, which follows the table structure of the destination with an added operation metadata column. The data is inserted into the temporary table using Bulk Copy. This allows for fast binary insertion into the temporary table.
After data has been inserted into the temporary table, a merge into statement is run that merges data into the destination table. After all data has been merged, the temporary table is cleared of all data.
If there are multiple rows in the result with the same primary key, only the latest seen row will be inserted into the destination table.
Custom Primary Keys
In some scenarios you may want to override the table's primary keys or the table might not have a primary key configured. In this scenario you can provide column names for the columns you want Flowtide to use as primary keys.
Ex:
connectorManager.AddSqlServerSink("your regexp on table names", new SqlServerSinkOptions() {
ConnectionStringFunc = () => connectionString,
CustomPrimaryKeys = new List<string>() { "my_column1", "my_column2" }
});
SQL Table Provider
The SQL table provider is added to the SQL plan builder which will try and look after used tables in its configured SQL Server. It provides metadata information about what the column names are in the table.
To use the table provider add the following line to the Sql plan builder:
sqlBuilder.AddSqlServerProvider(() => connectionString);
If you are starting Flowtide with dependency injection, a table provider is added automatically, so this step is not required.