Skip to main content

Qdrant connector

The Qdrant connector only supports a sink, where data is stored into a Qdrant collection.

To use it, add the following nuget:

  • FlowtideDotNet.Connector.Qdrant

Qdrant sink

The qdrant sink is used to continously update, insert and delete points in a qdrant collection.

Flowtide will add identifying metadata under the flowtide key, this contains metadata that is used to track and update rows.

It will also add some general metadata under the data key (unless modified by the QdrantPayloadDataPropertyName-option), this contains information like last_update and which embedding generator was used. AS well as the vectorized string (if not disabled by the QdrantIncludeVectorTextInPayload-option).

info

It's important that the Qdrant collection contains a payload indexes for the keys flowtide and id.

  • The flowtide index should be a text index.
  • The id index should (most likely) be a keyword index.

Without these indexes resources used will be drastically higher and performance will be diminished.

To use it add it to the ConnectorManager:

connectorManager.AddQdrantSink("{regexPattern}", new QdrantSinkOptions
{
QdrantChannelFunc = () => QdrantChannel.ForAddress("{qdrantAddress}"),
CollectionName = collection,
},
EmbeddingGeneratorImplementation,
chunker);

// there's also a built in open ai embedding generator, register by setting the OpenAiEmbeddingOptions
connectorManager.AddQdrantSink("{regexPattern}", new QdrantSinkOptions
{
QdrantChannelFunc = () => QdrantChannel.ForAddress("{qdrantAddress}"),
CollectionName = collection,
}, new OpenAiEmbeddingOptions()
{
ApiKeyFunc = () => "{OpenAi api key}",
UrlFunc = () => "{OpenAi api url}"
},
chunker);

// optional string chunker
var chunker = new TikTokenChunker(new TikTokenChunkerOptions
{
Tokenizer = TiktokenTokenizer.CreateForModel("gpt-4o"),
TokenChunkSize = 170,
TokenChunkOverlap = 20,
});

Special columns

There are two required columns that must be selected.

  • id: The identifier/PK for the row. This can be modified with the IdColumnName option.
  • vector_string: The string content that should be vectorized. This can be modified with the VectorStringColumnName option.

Options

Options for the sink:

OptionrequireddefaultDescription
QdrantChannelFuncTrueChannel for communicating with the Qdrant instance.
CollectionNameTrueThe collection name that should be used to store data.
IdColumnNameTrueidThe selected column that should be used as identity.
VectorStringColumnNameTruevector_stringThe selected column that should be vectorized.
OnInitializefalseHook that is run when the sink is intializing.
OnInitialDataSentfalseHook that is run after initial data has been sent.
OnChangesDonefalseHook that is run after changes have been handled.
WaitfalsetrueWait for calls against Qdrant to complete before continuing.
QdrantVectorTextPropertyNametruetextThe property name that should be used for the vectorized string in the payload
QdrantPayloadDataPropertyNametruedataUnder which key metadata should be added to in the Qdrant collection. This will contain information like last_update and which embedding generator was used.
QdrantIncludeVectorTextInPayloadtruetrueIf the vectorized string should be stored in the payload or not.
QdrantStoreMapsUnderOwnKeytruefalseIf set to true any selected map or named_struct will be added under its own key in the payload
QdrantStoreListsUnderOwnKeytruefalseIf true any selected list will be added under its own key in the payload.
QdrantPayloadUpdateModetrueOverWritePayloadSet or overwrite the payload when updating a point. Set will keep any property not handled by the flowtide stream, overwrite will remove any such property.
ResiliencePipelinetrueIncremental retriesResilience pipeline for requests against Qdrant. By default an incremental retry strategy that retries 10 times.
MaxNumberOfBatchOperationstrue1000The max number of operations before the batch should be sent to Qdrant.
MaxDepthtrue64The max depth of selected map and named_struct columns

Example updating alias on initialize

OnInitialize = async (state, client) =>
{
var collection = state.CollectionName;
// set the collection name to a unique name based on the current version of flowtide
// note that flowtide versioning needs to be added with, for instance, flowtideBuilder.AddVersioningFromString(...)
if (!string.IsNullOrWhiteSpace(state.StreamVersion?.Version))
{
collection = $"collection-{state.StreamVersion.Version}";
}

// if the collection does not already exist, add it to qdrant and add payload indexes
state.CollectionName = collection;
if (!await client.CollectionExistsAsync(state.CollectionName))
{
await client.CreateCollectionAsync(state.CollectionName, new VectorParams
{
// configure the collection
Size = 1536,
Distance = Distance.Cosine,
});

// These indexes are important as they are used by flowtide to filter and udate points
await client.CreatePayloadIndexAsync(state.CollectionName, "id", PayloadSchemaType.Keyword);
await client.CreatePayloadIndexAsync(state.CollectionName, "flowtide", PayloadSchemaType.Text);
}
},
OnInitialDataSent = async (state, client) =>
{
// delete the alias from the original collection and add it to the new collection
await client.UpdateAliasesAsync(
[
new AliasOperations
{
DeleteAlias = new DeleteAlias
{
AliasName = alias,
},
CreateAlias = new CreateAlias
{
AliasName = alias,
CollectionName = state.CollectionName,
},
},
]);

// delete the old collection
await client.DeleteCollectionAsync(state.OriginalCollectionName);
},

Embeddings generator

An embeddings generator is required for this sink. It should implement IEmbeddingGenerator.

OpenAi embeddings generator

Embedding generator for the open ai api (OpenAiEmbeddingsGenerator), but can be used for any api that matches the return type of open ai, like azure open ai for instance.

Options

OptionrequireddefaultDescription
UrlFunctrueUrl to the api.
ApiKeyFunctrueApi key for the api.
MaxRequestsPerMinutetrue900Max number of requests per minute. This is used by the default ResiliencePipeline.
ResiliencePipelinetrueInner rate limiter with outer retry strategy.Adds a rate limiter that permits MaxRequestsPerMinute requets per minute. If the rate limiter hits its limit the retry strategy will wait upto 1 minute until the next request occurs. For general errors the retry strategy will retry 5 times incrementally before failing.

String chunker

Optionally the sink can take a string chunker (IStringChunker) that splits the vector_string into multiple smaller chunks.

TikTokenStringChunker

A simple chunker that removes newlines and splits the content into multiple parts with overlap based on the options TokenChunkSize and TokenChunkOverlap.

Options

OptionrequireddefaultDescription
TokenChunkSizetrueThe chunk size.
TokenChunkOverlaptrueThe chunk overlap. This many tokens from the previous chunk should be added to the start of the current.
TiktokenTokenizertrueThe tiktoken tokenizer that should be used to tokenize the content.
MinTokenChunkSizefalseIf a chunk is below this limit it will be merged into the previous chunk.