Guide Home 1. Solution Topology 2. Aspire as 3. Shared Configuration 4. Metadata with 5. Upload API 6. Object Storage 7. Worker Ingestion 8. Extracting and 9. Literary Artifacts 10. AI Provider 11. Qdrant Vector 12. Ask Flow 13. Prompting and 14. Testing the 15. Local Development All Guides
Guide navigationIndex and chapters
Chapter 7

Worker Ingestion Pipeline

RAG.Worker/Worker.cs is a polling background service. Every configured interval, it asks IDocumentIngestionService to process pending documents.

Decorative chapter image for Worker Ingestion Pipeline
The ingestion service owns the long-running pipeline
await UpdateProgressAsync(document, "Extracting text", 8, cancellationToken);
var extracted = await extractor.ExtractAsync(original, document.ContentType, document.FileName, cancellationToken);

await UpdateProgressAsync(document, "Chunking text", 18, cancellationToken);
var sourceChunks = chunker.Chunk(document.Id, document.FileName, document.ObjectKey, extracted);

RAG.Worker/Worker.cs is a polling background service. Every configured interval, it asks IDocumentIngestionService to process pending documents.

The worker supports stale processing recovery. If a document is marked Processing but has not updated recently, it can be picked up again. This is useful during development when the app is stopped mid-ingestion.

The polling query is behind IIngestionWorkSource. DatabaseIngestionWorkSource is the default implementation and reads pending or stale rows from SQLite, ordered by creation time and capped to a small batch. A production version could replace that one seam with a queue-backed source while leaving the ingestion pipeline unchanged.

RAG.Core/Services/DocumentIngestionService.cs is the pipeline:

  1. mark document as Processing;
  2. verify storage and Qdrant are available;
  3. open the original file from MinIO;
  4. extract text;
  5. chunk source text;
  6. generate literary artifacts;
  7. combine artifact chunks and source chunks;
  8. delete old vectors for the document;
  9. generate embeddings;
  10. upsert embedded chunks to Qdrant;
  11. mark the document Indexed;
  12. save final progress.

Progress is updated between major stages:

Preparing storage
Extracting text
Chunking text
Building book club profile
Resetting existing index
Generating embeddings
Writing vector index
Ready

If any exception occurs, the document is marked Failed and the error message is surfaced to the UI.

Production note: reindexing currently deletes the existing vectors before the replacement index has fully succeeded, and ingestion errors are shown directly in the UI for developer visibility. In production, a safer approach would build the replacement index first, swap only after success, and keep detailed exception text in logs rather than user-facing responses. This sample favors readability because it is a learning project.

The ingestion service also logs structured milestones: work-source batch size, ingestion start and completion, extracted page count, source chunk count, generated artifact count, embedding count, vector upsert count, and failures. The logs avoid full document text and prompts; they focus on IDs, counts, stages, and timings that help operate the pipeline.