Pipeline de ingesta del worker
RAG.Worker/Worker.cs es un servicio en segundo plano que consulta periódicamente. En cada intervalo configurado, pide a IDocumentIngestionService que procese documentos pendientes.
El servicio de ingesta controla el pipeline de larga duración
await UpdateProgressAsync(document, "Extracting text", 8, cancellationToken);
var extracted = await extractor.ExtractAsync(original, document.ContentType, document.FileName, cancellationToken);
await UpdateProgressAsync(document, "Chunking text", 18, cancellationToken);
var sourceChunks = chunker.Chunk(document.Id, document.FileName, document.ObjectKey, extracted);RAG.Worker/Worker.cs es un servicio en segundo plano que consulta periódicamente. En cada intervalo configurado, pide a IDocumentIngestionService que procese documentos pendientes.
El worker soporta recuperación de procesamiento vencido. Si un documento está marcado como Processing pero no se ha actualizado recientemente, puede tomarse de nuevo. Esto es útil durante desarrollo cuando la app se detiene a mitad de la ingesta.
La consulta de polling está detrás de IIngestionWorkSource. DatabaseIngestionWorkSource es la implementación predeterminada y lee filas pendientes o vencidas desde SQLite, ordenadas por fecha de creación y limitadas a un lote pequeño. Una versión de producción podría reemplazar ese punto con una fuente basada en cola sin cambiar el pipeline de ingesta.
RAG.Core/Services/DocumentIngestionService.cs es el pipeline:
- marcar el documento como
Processing; - verificar que almacenamiento y Qdrant estén disponibles;
- abrir el archivo original desde MinIO;
- extraer texto;
- dividir texto fuente en chunks;
- generar artefactos literarios;
- combinar chunks de artefactos y chunks fuente;
- borrar vectores anteriores del documento;
- generar embeddings;
- hacer upsert de chunks con embeddings en Qdrant;
- marcar el documento como
Indexed; - guardar progreso final.
El progreso se actualiza entre etapas principales:
Preparing storage
Extracting text
Chunking text
Building book club profile
Resetting existing index
Generating embeddings
Writing vector index
Ready
Si ocurre cualquier excepción, el documento se marca como Failed y el mensaje de error se muestra en la UI.
Nota de producción: la reindexación actualmente borra los vectores existentes antes de que el índice de reemplazo haya terminado con éxito, y los errores de ingesta se muestran directamente en la UI para visibilidad del desarrollador. En producción, un enfoque más seguro construiría primero el índice de reemplazo, haría el cambio solo después del éxito y mantendría el texto detallado de excepciones en logs en lugar de respuestas visibles para usuarios. Este ejemplo favorece la legibilidad porque es un proyecto de aprendizaje.
El servicio de ingesta también registra hitos estructurados: tamaño del lote de work source, inicio y fin de ingesta, conteo de páginas extraídas, conteo de chunks fuente, conteo de artefactos generados, conteo de embeddings, conteo de upserts vectoriales y fallas. Los logs evitan texto completo de documentos y prompts; se enfocan en IDs, conteos, etapas y tiempos que ayudan a operar el pipeline.