Inicio de la guía 1. Topología de la solución 2. Aspire como plano de control local 3. Configuración y contratos compartidos 4. Metadatos con SQLite y EF Core 5. API e interfaz de carga 6. Almacenamiento de objetos con MinIO 7. Pipeline de ingesta del worker 8. Extracción y división de texto 9. Artefactos literarios 10. Abstracciones de proveedores de IA 11. Almacenamiento vectorial con Qdrant 12. Flujo de preguntas y recuperación 13. Prompts y citas 14. Pruebas del pipeline 15. Notas de desarrollo local Todas las guías
Navegación de la guíaÍndice y capítulos
Capítulo 7

Pipeline de ingesta del worker

RAG.Worker/Worker.cs es un servicio en segundo plano que consulta periódicamente. En cada intervalo configurado, pide a IDocumentIngestionService que procese documentos pendientes.

Nota de traducción: Esta versión en español fue traducida con ayuda de un LLM y revisada para conservar los términos técnicos, el código y los nombres de archivos en inglés cuando corresponde.
Imagen decorativa del capítulo sobre Pipeline de ingesta del worker
El servicio de ingesta controla el pipeline de larga duración
await UpdateProgressAsync(document, "Extracting text", 8, cancellationToken);
var extracted = await extractor.ExtractAsync(original, document.ContentType, document.FileName, cancellationToken);

await UpdateProgressAsync(document, "Chunking text", 18, cancellationToken);
var sourceChunks = chunker.Chunk(document.Id, document.FileName, document.ObjectKey, extracted);

RAG.Worker/Worker.cs es un servicio en segundo plano que consulta periódicamente. En cada intervalo configurado, pide a IDocumentIngestionService que procese documentos pendientes.

El worker soporta recuperación de procesamiento vencido. Si un documento está marcado como Processing pero no se ha actualizado recientemente, puede tomarse de nuevo. Esto es útil durante desarrollo cuando la app se detiene a mitad de la ingesta.

La consulta de polling está detrás de IIngestionWorkSource. DatabaseIngestionWorkSource es la implementación predeterminada y lee filas pendientes o vencidas desde SQLite, ordenadas por fecha de creación y limitadas a un lote pequeño. Una versión de producción podría reemplazar ese punto con una fuente basada en cola sin cambiar el pipeline de ingesta.

RAG.Core/Services/DocumentIngestionService.cs es el pipeline:

  1. marcar el documento como Processing;
  2. verificar que almacenamiento y Qdrant estén disponibles;
  3. abrir el archivo original desde MinIO;
  4. extraer texto;
  5. dividir texto fuente en chunks;
  6. generar artefactos literarios;
  7. combinar chunks de artefactos y chunks fuente;
  8. borrar vectores anteriores del documento;
  9. generar embeddings;
  10. hacer upsert de chunks con embeddings en Qdrant;
  11. marcar el documento como Indexed;
  12. guardar progreso final.

El progreso se actualiza entre etapas principales:

Preparing storage
Extracting text
Chunking text
Building book club profile
Resetting existing index
Generating embeddings
Writing vector index
Ready

Si ocurre cualquier excepción, el documento se marca como Failed y el mensaje de error se muestra en la UI.

Nota de producción: la reindexación actualmente borra los vectores existentes antes de que el índice de reemplazo haya terminado con éxito, y los errores de ingesta se muestran directamente en la UI para visibilidad del desarrollador. En producción, un enfoque más seguro construiría primero el índice de reemplazo, haría el cambio solo después del éxito y mantendría el texto detallado de excepciones en logs en lugar de respuestas visibles para usuarios. Este ejemplo favorece la legibilidad porque es un proyecto de aprendizaje.

El servicio de ingesta también registra hitos estructurados: tamaño del lote de work source, inicio y fin de ingesta, conteo de páginas extraídas, conteo de chunks fuente, conteo de artefactos generados, conteo de embeddings, conteo de upserts vectoriales y fallas. Los logs evitan texto completo de documentos y prompts; se enfocan en IDs, conteos, etapas y tiempos que ayudan a operar el pipeline.