Skip to content

feat: run RAG via background jobs#1450

Draft
leonardmq wants to merge 11 commits into
leonard/kil-686-run-eval-via-jobfrom
leonard/kil-687-rag-via-job
Draft

feat: run RAG via background jobs#1450
leonardmq wants to merge 11 commits into
leonard/kil-686-run-eval-via-jobfrom
leonard/kil-687-rag-via-job

Conversation

@leonardmq

Copy link
Copy Markdown
Collaborator

Summary

  • New RagJobWorker runs the existing RagWorkflowRunner under the job system, with idempotency keyed on rag_config_id (re-launches supersede in-flight predecessors).
  • GET /api/projects/{project_id}/rag_configs/{rag_config_id}/run moved out of the shared kiln_server.document_api into a new desktop-only rag_jobs_api. Returns {kiln_job_tracking_id} immediately instead of streaming SSE; build_rag_workflow_runner stays in the shared lib as a pure helper.
  • Frontend rag_progress_store no longer opens per-config EventSources — it derives live progress, logs, and status from the project-scoped \$jobs store via a module-level reflector. Refreshing the configs list page now picks up runs spawned from any session.
  • JobContext grows report_display(primary?, secondary?) and report_metadata_patch(patch) for workers that need to rewrite the row's Details lines or attach structured per-kind state (RAG stamps the full RagProgress snapshot under metadata.rag_progress so the existing four-bar dialog keeps working).
  • rag_runners.py simplified: step runners yield absolute cumulative counts (was cumulative for the three doc-phases but per-batch deltas for indexing); workflow update_workflow_progress collapses from a four-arm match to a dict-mapped uniform assignment; logs accumulate on RagProgress.logs instead of being replaced per yield.
  • Re-snapshot at lock acquire + end of run. Parallel RAG runs that share an extractor / chunker / embedder used to leave each other's counters stuck at the pre-lock baseline (sibling A's work skips through B's collect_jobs, so B's per-phase step.success_count stays 0). The workflow now recomputes initial_progress after acquiring the workflow lock; the worker also re-snapshots once more at end of run for the final reported numbers, carrying runtime error counts and accumulated logs from the workflow's tracked state.

Test plan

  • Trigger a RAG run from the configs list — row flips to Running, % advances, per-phase lines update under Details.
  • Refresh the configs list page mid-run — row stays Running with up-to-date counters (the new reflector).
  • Kick off 5+ RAG configs that share an extractor — all complete with accurate counters (no phantom "N documents could not be processed").
  • Trigger the same RAG config twice — the second launch supersedes the first (no stacked rows in the jobs panel).
  • Cause a real per-item failure (bad doc) — error appears under View Errors on both the RAG dialog and the job row.
  • Run from MCP / assistant via GET .../run — receives {kiln_job_tracking_id} JSON.

🤖 Generated with Claude Code

leonardmq and others added 7 commits June 4, 2026 19:33
- New RagJobWorker (compute_state from on-disk artifacts; run delegates to
  RagWorkflowRunner; per-phase lines + structured RagProgress on metadata).
- New desktop endpoint returns {kiln_job_tracking_id}; old SSE endpoint and
  shared-lib helper deleted.
- JobContext gains report_display / report_metadata_patch for multi-phase
  workers that need to update Details lines / structured state per tick.
- rag_progress_store derives from the project-scoped jobs store now; the
  EventSource throttle is gone (job system handles concurrency).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Worker accumulates logs across ticks (the runner replaces them per-yield);
  full list now lands in metadata.rag_progress.logs.
- Error-level log entries are also forwarded to ctx.report_error so they
  appear in the per-run error log (View Errors).
- High-water-mark error count carried forward across ticks (defensive).
- Final reconciliation: if `total > success + errors` at end of run, the
  delta is silent skips (no original_file, filtered out, etc.) — count
  them as errors so the job ends up "Completed with errors" instead of a
  misleading plain "Completed" with the progress bar stuck at <100%.
- Frontend reads logs from rag_progress.logs (replace, not append: worker
  already accumulates).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The per-config detail page doesn't surface progress, so it'd be a dead-end
during a run. The configs list page is where the row's live status badge,
% complete, and Run/View Progress button live.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Module-level subscriber pushes any RAG-kind job in the jobs store into the
rag progress store: live progress/logs overlay the disk-derived snapshot,
status flips to "running" for non-terminal jobs, and the terminal mapping
matches what finalize_run did (succeeded+errors → completed_with_errors).

Without this, a page refresh during a run dropped back to the disk-derived
"incomplete" state because the per-spawn watcher only existed for the tab
that started the run. The new reflector picks them up regardless of which
session triggered the spawn.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Step runners yield absolute cumulative counts for their invocation (was:
  cumulative for the three doc-phases but per-batch deltas for indexing).
  Removes the inconsistency that forced different merge logic per phase.
- Workflow's update_workflow_progress collapses from a four-arm match to a
  single dict-mapped lookup + assignment. baseline + step_progress.* with no
  max() — the step runner's monotonic absolute counts mean the defense
  isn't load-bearing.
- Logs accumulate on RagProgress.logs (was: replaced per yield, forcing
  every consumer to re-accumulate). RagStepRunnerProgress.logs renamed to
  new_logs to reflect the "delta since last tick" semantic; workflow appends
  them to the running list.
- Step runners drop the log-only follow-up yields. New error entries ride
  with the same tick that bumped the count.
- RagJobWorker drops its own log accumulator and the high-water-mark error
  count defense — no longer needed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When the run finishes with documents that never went through any phase (no
source file, excluded by config, etc.), we already mark the job as
completed_with_errors via the silent-skip reconciliation — but the View
Errors dialogs were empty, leaving the user with a red badge and no
explanation. Synthesize a single LogMessage so:

- RAG run dialog's log list has the explanation.
- Job-panel View Errors shows it too (via ctx.report_error).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drops the silent-skip reconciliation in the worker, which was synthesizing
phantom errors ("N documents could not be processed") whenever a parallel
sibling RAG run completed work this config inherits — both runs would
compute_current_progress before any lock was acquired, then sibling A
would finish first and its work would skip-through B's collect_jobs,
leaving B's per-phase counters frozen at the stale pre-lock values.

Two snapshots now keep the counters honest:

- RagWorkflowRunner.run() recomputes initial_progress right after acquiring
  the workflow lock, so subsequent step ticks count against up-to-date
  on-disk reality instead of the pre-lock estimate.
- RagJobWorker.run() recomputes once more at end of run for the final
  reported numbers. Runtime per-phase error counts and accumulated logs
  carry over from `latest` (disk doesn't persist runtime errors).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9259379a-520c-4572-91be-954df2162a09

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch leonard/kil-687-rag-via-job

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request migrates RAG ingestion workflows to the background job system, introducing a new RagJobWorker and updating the frontend stores to track and reflect job progress. The review feedback highlights critical defensive programming improvements, specifically guarding against potential TypeError crashes in compute_state and update_workflow_progress when progress fields are None. Additionally, it is recommended to filter project jobs in reflect_jobs_into_rag_store to only process the latest job per configuration, preventing older jobs from overwriting newer state.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +111 to +135
async def compute_state(self, params: RagJobParams) -> JobDerivedState:
rag_config, project = _load_rag_config(params)
progress = await compute_current_progress_for_rag_config(project, rag_config)
total = progress.total_document_count
success = progress.total_document_completed_count
# is_complete only when all docs are processed AND all chunks indexed
# (indexing can lag chunking if a prior run died mid-indexing).
chunks_complete = (
progress.total_chunk_count > 0
and progress.total_chunks_indexed_count >= progress.total_chunk_count
) or (
# No chunks at all (e.g. empty dataset) — chunks aren't the gating
# signal; document completion is what matters.
progress.total_chunk_count == 0
)
return JobDerivedState(
total=total,
success=success,
# error left None: per-phase error counts only exist at runtime —
# source-of-truth entities don't persist failures. The registry
# preserves the live count last reported via report_progress,
# keeping View Errors meaningful across a pause.
is_complete=total > 0 and success >= total and chunks_complete,
message=None,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Enforce defensive programming by adding fallback values for RagProgress fields. If total_document_count, total_document_completed_count, total_chunk_count, or total_chunks_indexed_count are None (which can happen if they are optional or not yet initialized), operations like total > 0 or success >= total will raise a TypeError and crash the background worker.

    async def compute_state(self, params: RagJobParams) -> JobDerivedState:
        rag_config, project = _load_rag_config(params)
        progress = await compute_current_progress_for_rag_config(project, rag_config)
        total = progress.total_document_count or 0
        success = progress.total_document_completed_count or 0
        total_chunk_count = progress.total_chunk_count or 0
        total_chunks_indexed_count = progress.total_chunks_indexed_count or 0
        # is_complete only when all docs are processed AND all chunks indexed
        # (indexing can lag chunking if a prior run died mid-indexing).
        chunks_complete = (
            total_chunk_count > 0
            and total_chunks_indexed_count >= total_chunk_count
        ) or (
            # No chunks at all (e.g. empty dataset) — chunks aren't the gating
            # signal; document completion is what matters.
            total_chunk_count == 0
        )
        return JobDerivedState(
            total=total,
            success=success,
            # error left None: per-phase error counts only exist at runtime —
            # source-of-truth entities don't persist failures. The registry
            # preserves the live count last reported via report_progress,
            # keeping View Errors meaningful across a pause.
            is_complete=total > 0 and success >= total and chunks_complete,
            message=None,
        )

Comment on lines 822 to 861
def update_workflow_progress(
self, step_name: RagWorkflowStepNames, step_progress: RagStepRunnerProgress
) -> RagProgress:
# merge the simpler step-specific progress with the broader RAG progress
match step_name:
case RagWorkflowStepNames.EXTRACTING:
if step_progress.success_count is not None:
self.current_progress.total_document_extracted_count = max(
self.current_progress.total_document_extracted_count,
step_progress.success_count
+ self.initial_progress.total_document_extracted_count,
)
if step_progress.error_count is not None:
self.current_progress.total_document_extracted_error_count = max(
self.current_progress.total_document_extracted_error_count,
step_progress.error_count
+ self.initial_progress.total_document_extracted_error_count,
)
case RagWorkflowStepNames.CHUNKING:
if step_progress.success_count is not None:
self.current_progress.total_document_chunked_count = max(
self.current_progress.total_document_chunked_count,
step_progress.success_count
+ self.initial_progress.total_document_chunked_count,
)
if step_progress.error_count is not None:
self.current_progress.total_document_chunked_error_count = max(
self.current_progress.total_document_chunked_error_count,
step_progress.error_count
+ self.initial_progress.total_document_chunked_error_count,
)
case RagWorkflowStepNames.EMBEDDING:
if step_progress.success_count is not None:
self.current_progress.total_document_embedded_count = max(
self.current_progress.total_document_embedded_count,
step_progress.success_count
+ self.initial_progress.total_document_embedded_count,
)
if step_progress.error_count is not None:
self.current_progress.total_document_embedded_error_count = max(
self.current_progress.total_document_embedded_error_count,
step_progress.error_count
+ self.initial_progress.total_document_embedded_error_count,
)
case RagWorkflowStepNames.INDEXING:
if step_progress.success_count is not None:
self.current_progress.total_chunks_indexed_count += (
step_progress.success_count
)
if step_progress.error_count is not None:
self.current_progress.total_chunks_indexed_error_count += (
step_progress.error_count
)
case _:
raise_exhaustive_enum_error(step_name)
if step_name not in self._PHASE_FIELDS:
raise ValueError(f"Unhandled enum value: {step_name}")
success_field, error_field = self._PHASE_FIELDS[step_name]

# Step counts are absolute for THIS invocation. Add the baseline once
# to get the overall total. (Old code used max(current, baseline+delta)
# as a defensive monotonicity check, but with absolute counts from the
# step runner that's redundant.)
setattr(
self.current_progress,
success_field,
getattr(self.initial_progress, success_field) + step_progress.success_count,
)
setattr(
self.current_progress,
error_field,
getattr(self.initial_progress, error_field) + step_progress.error_count,
)

self.current_progress.total_document_completed_count = min(
self.current_progress.total_document_extracted_count,
self.current_progress.total_document_chunked_count,
self.current_progress.total_document_embedded_count,
)

self.current_progress.total_chunk_completed_count = (
self.current_progress.total_chunks_indexed_count
)

self.current_progress.logs = step_progress.logs
# Append new logs to the running list instead of replacing it. The
# previous "replace per yield" behavior dropped all prior entries,
# forcing callers to re-accumulate. Now `current_progress.logs` is
# the authoritative full history.
if step_progress.new_logs:
self.current_progress.logs = (
self.current_progress.logs or []
) + step_progress.new_logs
return self.current_progress

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Enforce defensive programming by guarding against None values in self.initial_progress fields. If any of the phase fields are None, adding them to step_progress.success_count or step_progress.error_count will raise a TypeError.

    def update_workflow_progress(
        self, step_name: RagWorkflowStepNames, step_progress: RagStepRunnerProgress
    ) -> RagProgress:
        if step_name not in self._PHASE_FIELDS:
            raise ValueError(f"Unhandled enum value: {step_name}")
        success_field, error_field = self._PHASE_FIELDS[step_name]

        # Step counts are absolute for THIS invocation. Add the baseline once
        # to get the overall total. (Old code used max(current, baseline+delta)
        # as a defensive monotonicity check, but with absolute counts from the
        # step runner that's redundant.)
        setattr(
            self.current_progress,
            success_field,
            (getattr(self.initial_progress, success_field) or 0) + step_progress.success_count,
        )
        setattr(
            self.current_progress,
            error_field,
            (getattr(self.initial_progress, error_field) or 0) + step_progress.error_count,
        )

        self.current_progress.total_document_completed_count = min(
            self.current_progress.total_document_extracted_count,
            self.current_progress.total_document_chunked_count,
            self.current_progress.total_document_embedded_count,
        )
        self.current_progress.total_chunk_completed_count = (
            self.current_progress.total_chunks_indexed_count
        )

        # Append new logs to the running list instead of replacing it. The
        # previous "replace per yield" behavior dropped all prior entries,
        # forcing callers to re-accumulate. Now `current_progress.logs` is
        # the authoritative full history.
        if step_progress.new_logs:
            self.current_progress.logs = (
                self.current_progress.logs or []
            ) + step_progress.new_logs
        return self.current_progress

Comment on lines +65 to +133
function reflect_jobs_into_rag_store($jobs: JobRecord[]): void {
// Group RAG jobs by project so each project's state updates atomically.
const by_project = new Map<string, JobRecord[]>()
for (const job of $jobs) {
const tag = get_tag(job)
if (tag?.kind !== "rag") continue
if (!job.project_id) continue
const list = by_project.get(job.project_id) ?? []
list.push(job)
by_project.set(job.project_id, list)
}

for (const [project_id, project_jobs] of by_project) {
ragProgressStore.updateProjectState(project_id, (state) => {
const progress = { ...state.progress }
const logs = { ...state.logs }
const status = { ...state.status }
const running = { ...state.running_rag_configs }

for (const job of project_jobs) {
const tag = get_tag(job)
if (tag?.kind !== "rag") continue
const rag_config_id = tag.rag_config_id

// Live progress / logs come from the worker's per-tick metadata
// patch. Always overlay when present — the worker's snapshot is
// strictly fresher than whatever load_all_rag_config_progress put
// there at page-mount time.
const rag_progress = (
job.metadata as { rag_progress?: RagProgress } | null
)?.rag_progress
if (rag_progress) {
progress[rag_config_id] = rag_progress
logs[rag_config_id] = rag_progress.logs ?? []
}

if (!is_terminal(job.status)) {
status[rag_config_id] = "running"
running[rag_config_id] = true
} else {
running[rag_config_id] = false
if (job.status === "succeeded") {
const has_errs = (job.progress?.error ?? 0) > 0
// Re-derive from the latest progress for the "succeeded but
// silently incomplete" case (worker reconciles silent skips
// into progress.error, so has_errs catches those too).
const p = progress[rag_config_id]
status[rag_config_id] = has_errs
? "completed_with_errors"
: p
? calculateStatus(p)
: "complete"
} else {
// failed / cancelled
status[rag_config_id] = "completed_with_errors"
}
}
}

return {
...state,
progress,
logs,
status,
running_rag_configs: running,
}
})
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If there are multiple jobs for the same rag_config_id in $jobs (e.g., a completed historical job and a newly started running job), iterating through them in arbitrary order can cause older jobs to overwrite the state of newer ones. We should filter project_jobs to only process the latest job for each rag_config_id based on created_at.

function reflect_jobs_into_rag_store($jobs: JobRecord[]): void {
  // Group RAG jobs by project so each project's state updates atomically.
  const by_project = new Map<string, JobRecord[]>()
  for (const job of $jobs) {
    const tag = get_tag(job)
    if (tag?.kind !== "rag") continue
    if (!job.project_id) continue
    const list = by_project.get(job.project_id) ?? []
    list.push(job)
    by_project.set(job.project_id, list)
  }

  for (const [project_id, project_jobs] of by_project) {
    ragProgressStore.updateProjectState(project_id, (state) => {
      const progress = { ...state.progress }
      const logs = { ...state.logs }
      const status = { ...state.status }
      const running = { ...state.running_rag_configs }

      // Only keep the latest job per rag_config_id to prevent older jobs from overwriting newer ones
      const latest_jobs = new Map<string, JobRecord>()
      for (const job of project_jobs) {
        const tag = get_tag(job)
        if (!tag) continue
        const rag_config_id = tag.rag_config_id
        const existing = latest_jobs.get(rag_config_id)
        if (!existing || (job.created_at && existing.created_at && new Date(job.created_at) > new Date(existing.created_at))) {
          latest_jobs.set(rag_config_id, job)
        }
      }

      for (const job of latest_jobs.values()) {
        const tag = get_tag(job)
        if (tag?.kind !== "rag") continue
        const rag_config_id = tag.rag_config_id

        // Live progress / logs come from the worker's per-tick metadata
        // patch. Always overlay when present — the worker's snapshot is
        // strictly fresher than whatever load_all_rag_config_progress put
        // there at page-mount time.
        const rag_progress = (
          job.metadata as { rag_progress?: RagProgress } | null
        )?.rag_progress
        if (rag_progress) {
          progress[rag_config_id] = rag_progress
          logs[rag_config_id] = rag_progress.logs ?? []
        }

        if (!is_terminal(job.status)) {
          status[rag_config_id] = "running"
          running[rag_config_id] = true
        } else {
          running[rag_config_id] = false
          if (job.status === "succeeded") {
            const has_errs = (job.progress?.error ?? 0) > 0
            // Re-derive from the latest progress for the "succeeded but
            // silently incomplete" case (worker reconciles silent skips
            // into progress.error, so has_errs catches those too).
            const p = progress[rag_config_id]
            status[rag_config_id] = has_errs
              ? "completed_with_errors"
              : p
                ? calculateStatus(p)
                : "complete"
          } else {
            // failed / cancelled
            status[rag_config_id] = "completed_with_errors"
          }
        }
      }

      return {
        ...state,
        progress,
        logs,
        status,
        running_rag_configs: running,
      }
    })
  }
}

@github-actions

github-actions Bot commented Jun 4, 2026

Copy link
Copy Markdown

📊 Coverage Report

Overall Coverage: 92%

Diff: origin/leonard/kil-686-run-eval-via-job...HEAD

  • app/desktop/desktop_server.py (100%)
  • app/desktop/studio_server/jobs/api.py (100%)
  • app/desktop/studio_server/jobs/models.py (90.9%): Missing lines 211
  • app/desktop/studio_server/jobs/registry.py (60.0%): Missing lines 440,446,454-456,462-464
  • app/desktop/studio_server/jobs/workers/rag.py (79.4%): Missing lines 84-88,114-117,120,128,169-170,190
  • app/desktop/studio_server/rag_jobs_api.py (73.9%): Missing lines 59-62,67,94
  • libs/core/kiln_ai/adapters/rag/rag_runners.py (60.0%): Missing lines 339-341,436-440,547-551,858
  • libs/server/kiln_server/document_api.py (100%)

Summary

  • Total: 162 lines
  • Missing: 43 lines
  • Coverage: 73%

Line-by-line

View line-by-line diff coverage

app/desktop/studio_server/jobs/models.py

Lines 207-215

  207         snapshot under `metadata.rag_progress` so the frontend dialog can
  208         keep showing its four per-phase progress bars without parsing the
  209         human-readable `display.secondary` strings.
  210         """
! 211         await self._report_metadata_patch(patch)
  212 
  213     async def report_error(self, error_message: str, **extra: Any) -> None:
  214         """Append one structured error entry to this run's error log.

app/desktop/studio_server/jobs/registry.py

Lines 436-444

  436 
  437         async def report_display(update: JobDisplayUpdate) -> None:
  438             job = self._jobs.get(job_id)
  439             if job is None or job.run_id != run_id:
! 440                 return
  441             # metadata is free-form; the display sub-object is the only piece
  442             # the table renders. Merge in only the fields the worker provided
  443             # so a worker that ticks only `secondary` doesn't blank `primary`.
  444             display = dict(job.metadata.get("display") or {})

Lines 442-450

  442             # the table renders. Merge in only the fields the worker provided
  443             # so a worker that ticks only `secondary` doesn't blank `primary`.
  444             display = dict(job.metadata.get("display") or {})
  445             if update.primary is not None:
! 446                 display["primary"] = update.primary
  447             if update.secondary is not None:
  448                 display["secondary"] = update.secondary
  449             job.metadata = {**job.metadata, "display": display}
  450             self._touch(job)

Lines 450-460

  450             self._touch(job)
  451             self._emit(job)
  452 
  453         async def report_metadata_patch(patch: dict[str, Any]) -> None:
! 454             job = self._jobs.get(job_id)
! 455             if job is None or job.run_id != run_id:
! 456                 return
  457             # Shallow merge — caller provides full sub-object values for the
  458             # keys they want updated. `tag` and `display` (which carry the
  459             # generic identity / row-rendering contract) are not protected
  460             # here, but no worker has a reason to overwrite them; this method

Lines 458-468

  458             # keys they want updated. `tag` and `display` (which carry the
  459             # generic identity / row-rendering contract) are not protected
  460             # here, but no worker has a reason to overwrite them; this method
  461             # is for adding per-kind structured state alongside.
! 462             job.metadata = {**job.metadata, **patch}
! 463             self._touch(job)
! 464             self._emit(job)
  465 
  466         return JobContext(
  467             job_id,
  468             run_id,

app/desktop/studio_server/jobs/workers/rag.py

Lines 80-92

  80     """Resolve the on-disk RagConfig + parent project. Raised errors land in
  81     the job's `failed` state — the same outcome the user would have gotten
  82     from the inline endpoint.
  83     """
! 84     project = project_from_id(params.project_id)
! 85     rag_config = RagConfig.from_id_and_parent_path(params.rag_config_id, project.path)
! 86     if rag_config is None:
! 87         raise ValueError(f"RAG config {params.rag_config_id} not found")
! 88     return rag_config, project
  89 
  90 
  91 class RagJobWorker(JobWorker[RagJobParams, RagJobResult]):
  92     """Background worker that runs a RAG ingestion workflow against one config.

Lines 110-124

  110     result_model = RagJobResult
  111     supports_pause = True
  112 
  113     async def compute_state(self, params: RagJobParams) -> JobDerivedState:
! 114         rag_config, project = _load_rag_config(params)
! 115         progress = await compute_current_progress_for_rag_config(project, rag_config)
! 116         total = progress.total_document_count
! 117         success = progress.total_document_completed_count
  118         # is_complete only when all docs are processed AND all chunks indexed
  119         # (indexing can lag chunking if a prior run died mid-indexing).
! 120         chunks_complete = (
  121             progress.total_chunk_count > 0
  122             and progress.total_chunks_indexed_count >= progress.total_chunk_count
  123         ) or (
  124             # No chunks at all (e.g. empty dataset) — chunks aren't the gating

Lines 124-132

  124             # No chunks at all (e.g. empty dataset) — chunks aren't the gating
  125             # signal; document completion is what matters.
  126             progress.total_chunk_count == 0
  127         )
! 128         return JobDerivedState(
  129             total=total,
  130             success=success,
  131             # error left None: per-phase error counts only exist at runtime —
  132             # source-of-truth entities don't persist failures. The registry

Lines 165-174

  165             # log so they show up under "View Errors" in the jobs panel. The
  166             # runner now accumulates logs on RagProgress.logs, so we slice
  167             # from where we left off.
  168             for log in (progress.logs or [])[forwarded_log_count:]:
! 169                 if log.level == "error":
! 170                     await ctx.report_error(log.message)
  171             forwarded_log_count = len(progress.logs or [])
  172 
  173             errors = _aggregate_errors(progress)
  174             await ctx.report_progress(

Lines 186-194

  186         if latest is None:
  187             # RagWorkflowRunner always yields at least the initial_progress, so
  188             # this shouldn't be reachable — but if it ever is, return zeros
  189             # rather than letting the result_model validation fail.
! 190             return RagJobResult(
  191                 documents_total=0,
  192                 documents_completed=0,
  193                 chunks_indexed=0,
  194                 errors=0,

app/desktop/studio_server/rag_jobs_api.py

Lines 55-71

  55         the jobs SSE bus (`GET /api/jobs/events`). Idempotent: re-running the
  56         same RAG config supersedes any in-flight predecessor (cancel + remove)
  57         instead of stacking a duplicate row in the jobs panel.
  58         """
! 59         project = project_from_id(project_id)
! 60         rag_config = get_rag_config_from_id(project, rag_config_id)
! 61         if rag_config.is_archived:
! 62             raise HTTPException(
  63                 status_code=422,
  64                 detail="This RAG configuration is archived. You must unarchive it to use it.",
  65             )
  66 
! 67         job = await job_registry.create(
  68             "rag",
  69             {
  70                 "project_id": project_id,
  71                 "rag_config_id": rag_config_id,

Lines 90-95

  90             # cancel+restart semantic visible in the panel.
  91             idempotency_key=f"rag:{rag_config_id}",
  92         )
  93 
! 94         return RunRagConfigResponse(kiln_job_tracking_id=job.id)

libs/core/kiln_ai/adapters/rag/rag_runners.py

Lines 335-345

  335                 # into the same yield as the counts — one tick per AsyncJobRunner
  336                 # progress update, instead of interleaving log-only yields.
  337                 new_logs: list[LogMessage] = []
  338                 if observer.get_error_count() > error_idx:
! 339                     errors, error_idx = observer.get_errors(error_idx)
! 340                     for job, error in errors:
! 341                         new_logs.append(
  342                             LogMessage(
  343                                 level="error",
  344                                 message=f"Error extracting document: {job.doc.path}: {error}",
  345                             )

Lines 432-444

  432             )
  433 
  434             error_idx = 0
  435             async for progress in runner.run():
! 436                 new_logs: list[LogMessage] = []
! 437                 if observer.get_error_count() > error_idx:
! 438                     errors, error_idx = observer.get_errors(error_idx)
! 439                     for job, error in errors:
! 440                         new_logs.append(
  441                             LogMessage(
  442                                 level="error",
  443                                 message=f"Error chunking document: {job.extraction.path}: {error}",
  444                             )

Lines 543-555

  543             )
  544 
  545             error_idx = 0
  546             async for progress in runner.run():
! 547                 new_logs: list[LogMessage] = []
! 548                 if observer.get_error_count() > error_idx:
! 549                     errors, error_idx = observer.get_errors(error_idx)
! 550                     for job, error in errors:
! 551                         new_logs.append(
  552                             LogMessage(
  553                                 level="error",
  554                                 message=f"Error embedding document: {job.chunked_document.path}: {error}",
  555                             )

Lines 854-862

  854         # previous "replace per yield" behavior dropped all prior entries,
  855         # forcing callers to re-accumulate. Now `current_progress.logs` is
  856         # the authoritative full history.
  857         if step_progress.new_logs:
! 858             self.current_progress.logs = (
  859                 self.current_progress.logs or []
  860             ) + step_progress.new_logs
  861         return self.current_progress


leonardmq and others added 3 commits June 5, 2026 02:15
- RagJobWorker: thread save_context_for_project into
  build_rag_workflow_runner so background RAG ingestion writes
  (Extraction/ChunkedDocument/ChunkEmbeddings) are git-sync committed
  for auto-sync projects instead of left dirty (mirrors eval worker).
  Adds a regression test locking in the save_context wiring.
- rag_jobs_api: add missing summary="Run RAG Config" and document why
  the run endpoint stays GET (idempotent supersede, writes no project
  files inline). Regenerate api_schema.d.ts for the summary.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant