feat: run RAG via background jobs#1450
Conversation
- New RagJobWorker (compute_state from on-disk artifacts; run delegates to
RagWorkflowRunner; per-phase lines + structured RagProgress on metadata).
- New desktop endpoint returns {kiln_job_tracking_id}; old SSE endpoint and
shared-lib helper deleted.
- JobContext gains report_display / report_metadata_patch for multi-phase
workers that need to update Details lines / structured state per tick.
- rag_progress_store derives from the project-scoped jobs store now; the
EventSource throttle is gone (job system handles concurrency).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Worker accumulates logs across ticks (the runner replaces them per-yield); full list now lands in metadata.rag_progress.logs. - Error-level log entries are also forwarded to ctx.report_error so they appear in the per-run error log (View Errors). - High-water-mark error count carried forward across ticks (defensive). - Final reconciliation: if `total > success + errors` at end of run, the delta is silent skips (no original_file, filtered out, etc.) — count them as errors so the job ends up "Completed with errors" instead of a misleading plain "Completed" with the progress bar stuck at <100%. - Frontend reads logs from rag_progress.logs (replace, not append: worker already accumulates). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The per-config detail page doesn't surface progress, so it'd be a dead-end during a run. The configs list page is where the row's live status badge, % complete, and Run/View Progress button live. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Module-level subscriber pushes any RAG-kind job in the jobs store into the rag progress store: live progress/logs overlay the disk-derived snapshot, status flips to "running" for non-terminal jobs, and the terminal mapping matches what finalize_run did (succeeded+errors → completed_with_errors). Without this, a page refresh during a run dropped back to the disk-derived "incomplete" state because the per-spawn watcher only existed for the tab that started the run. The new reflector picks them up regardless of which session triggered the spawn. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Step runners yield absolute cumulative counts for their invocation (was: cumulative for the three doc-phases but per-batch deltas for indexing). Removes the inconsistency that forced different merge logic per phase. - Workflow's update_workflow_progress collapses from a four-arm match to a single dict-mapped lookup + assignment. baseline + step_progress.* with no max() — the step runner's monotonic absolute counts mean the defense isn't load-bearing. - Logs accumulate on RagProgress.logs (was: replaced per yield, forcing every consumer to re-accumulate). RagStepRunnerProgress.logs renamed to new_logs to reflect the "delta since last tick" semantic; workflow appends them to the running list. - Step runners drop the log-only follow-up yields. New error entries ride with the same tick that bumped the count. - RagJobWorker drops its own log accumulator and the high-water-mark error count defense — no longer needed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When the run finishes with documents that never went through any phase (no source file, excluded by config, etc.), we already mark the job as completed_with_errors via the silent-skip reconciliation — but the View Errors dialogs were empty, leaving the user with a red badge and no explanation. Synthesize a single LogMessage so: - RAG run dialog's log list has the explanation. - Job-panel View Errors shows it too (via ctx.report_error). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drops the silent-skip reconciliation in the worker, which was synthesizing
phantom errors ("N documents could not be processed") whenever a parallel
sibling RAG run completed work this config inherits — both runs would
compute_current_progress before any lock was acquired, then sibling A
would finish first and its work would skip-through B's collect_jobs,
leaving B's per-phase counters frozen at the stale pre-lock values.
Two snapshots now keep the counters honest:
- RagWorkflowRunner.run() recomputes initial_progress right after acquiring
the workflow lock, so subsequent step ticks count against up-to-date
on-disk reality instead of the pre-lock estimate.
- RagJobWorker.run() recomputes once more at end of run for the final
reported numbers. Runtime per-phase error counts and accumulated logs
carry over from `latest` (disk doesn't persist runtime errors).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request migrates RAG ingestion workflows to the background job system, introducing a new RagJobWorker and updating the frontend stores to track and reflect job progress. The review feedback highlights critical defensive programming improvements, specifically guarding against potential TypeError crashes in compute_state and update_workflow_progress when progress fields are None. Additionally, it is recommended to filter project jobs in reflect_jobs_into_rag_store to only process the latest job per configuration, preventing older jobs from overwriting newer state.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| async def compute_state(self, params: RagJobParams) -> JobDerivedState: | ||
| rag_config, project = _load_rag_config(params) | ||
| progress = await compute_current_progress_for_rag_config(project, rag_config) | ||
| total = progress.total_document_count | ||
| success = progress.total_document_completed_count | ||
| # is_complete only when all docs are processed AND all chunks indexed | ||
| # (indexing can lag chunking if a prior run died mid-indexing). | ||
| chunks_complete = ( | ||
| progress.total_chunk_count > 0 | ||
| and progress.total_chunks_indexed_count >= progress.total_chunk_count | ||
| ) or ( | ||
| # No chunks at all (e.g. empty dataset) — chunks aren't the gating | ||
| # signal; document completion is what matters. | ||
| progress.total_chunk_count == 0 | ||
| ) | ||
| return JobDerivedState( | ||
| total=total, | ||
| success=success, | ||
| # error left None: per-phase error counts only exist at runtime — | ||
| # source-of-truth entities don't persist failures. The registry | ||
| # preserves the live count last reported via report_progress, | ||
| # keeping View Errors meaningful across a pause. | ||
| is_complete=total > 0 and success >= total and chunks_complete, | ||
| message=None, | ||
| ) |
There was a problem hiding this comment.
Enforce defensive programming by adding fallback values for RagProgress fields. If total_document_count, total_document_completed_count, total_chunk_count, or total_chunks_indexed_count are None (which can happen if they are optional or not yet initialized), operations like total > 0 or success >= total will raise a TypeError and crash the background worker.
async def compute_state(self, params: RagJobParams) -> JobDerivedState:
rag_config, project = _load_rag_config(params)
progress = await compute_current_progress_for_rag_config(project, rag_config)
total = progress.total_document_count or 0
success = progress.total_document_completed_count or 0
total_chunk_count = progress.total_chunk_count or 0
total_chunks_indexed_count = progress.total_chunks_indexed_count or 0
# is_complete only when all docs are processed AND all chunks indexed
# (indexing can lag chunking if a prior run died mid-indexing).
chunks_complete = (
total_chunk_count > 0
and total_chunks_indexed_count >= total_chunk_count
) or (
# No chunks at all (e.g. empty dataset) — chunks aren't the gating
# signal; document completion is what matters.
total_chunk_count == 0
)
return JobDerivedState(
total=total,
success=success,
# error left None: per-phase error counts only exist at runtime —
# source-of-truth entities don't persist failures. The registry
# preserves the live count last reported via report_progress,
# keeping View Errors meaningful across a pause.
is_complete=total > 0 and success >= total and chunks_complete,
message=None,
)| def update_workflow_progress( | ||
| self, step_name: RagWorkflowStepNames, step_progress: RagStepRunnerProgress | ||
| ) -> RagProgress: | ||
| # merge the simpler step-specific progress with the broader RAG progress | ||
| match step_name: | ||
| case RagWorkflowStepNames.EXTRACTING: | ||
| if step_progress.success_count is not None: | ||
| self.current_progress.total_document_extracted_count = max( | ||
| self.current_progress.total_document_extracted_count, | ||
| step_progress.success_count | ||
| + self.initial_progress.total_document_extracted_count, | ||
| ) | ||
| if step_progress.error_count is not None: | ||
| self.current_progress.total_document_extracted_error_count = max( | ||
| self.current_progress.total_document_extracted_error_count, | ||
| step_progress.error_count | ||
| + self.initial_progress.total_document_extracted_error_count, | ||
| ) | ||
| case RagWorkflowStepNames.CHUNKING: | ||
| if step_progress.success_count is not None: | ||
| self.current_progress.total_document_chunked_count = max( | ||
| self.current_progress.total_document_chunked_count, | ||
| step_progress.success_count | ||
| + self.initial_progress.total_document_chunked_count, | ||
| ) | ||
| if step_progress.error_count is not None: | ||
| self.current_progress.total_document_chunked_error_count = max( | ||
| self.current_progress.total_document_chunked_error_count, | ||
| step_progress.error_count | ||
| + self.initial_progress.total_document_chunked_error_count, | ||
| ) | ||
| case RagWorkflowStepNames.EMBEDDING: | ||
| if step_progress.success_count is not None: | ||
| self.current_progress.total_document_embedded_count = max( | ||
| self.current_progress.total_document_embedded_count, | ||
| step_progress.success_count | ||
| + self.initial_progress.total_document_embedded_count, | ||
| ) | ||
| if step_progress.error_count is not None: | ||
| self.current_progress.total_document_embedded_error_count = max( | ||
| self.current_progress.total_document_embedded_error_count, | ||
| step_progress.error_count | ||
| + self.initial_progress.total_document_embedded_error_count, | ||
| ) | ||
| case RagWorkflowStepNames.INDEXING: | ||
| if step_progress.success_count is not None: | ||
| self.current_progress.total_chunks_indexed_count += ( | ||
| step_progress.success_count | ||
| ) | ||
| if step_progress.error_count is not None: | ||
| self.current_progress.total_chunks_indexed_error_count += ( | ||
| step_progress.error_count | ||
| ) | ||
| case _: | ||
| raise_exhaustive_enum_error(step_name) | ||
| if step_name not in self._PHASE_FIELDS: | ||
| raise ValueError(f"Unhandled enum value: {step_name}") | ||
| success_field, error_field = self._PHASE_FIELDS[step_name] | ||
|
|
||
| # Step counts are absolute for THIS invocation. Add the baseline once | ||
| # to get the overall total. (Old code used max(current, baseline+delta) | ||
| # as a defensive monotonicity check, but with absolute counts from the | ||
| # step runner that's redundant.) | ||
| setattr( | ||
| self.current_progress, | ||
| success_field, | ||
| getattr(self.initial_progress, success_field) + step_progress.success_count, | ||
| ) | ||
| setattr( | ||
| self.current_progress, | ||
| error_field, | ||
| getattr(self.initial_progress, error_field) + step_progress.error_count, | ||
| ) | ||
|
|
||
| self.current_progress.total_document_completed_count = min( | ||
| self.current_progress.total_document_extracted_count, | ||
| self.current_progress.total_document_chunked_count, | ||
| self.current_progress.total_document_embedded_count, | ||
| ) | ||
|
|
||
| self.current_progress.total_chunk_completed_count = ( | ||
| self.current_progress.total_chunks_indexed_count | ||
| ) | ||
|
|
||
| self.current_progress.logs = step_progress.logs | ||
| # Append new logs to the running list instead of replacing it. The | ||
| # previous "replace per yield" behavior dropped all prior entries, | ||
| # forcing callers to re-accumulate. Now `current_progress.logs` is | ||
| # the authoritative full history. | ||
| if step_progress.new_logs: | ||
| self.current_progress.logs = ( | ||
| self.current_progress.logs or [] | ||
| ) + step_progress.new_logs | ||
| return self.current_progress |
There was a problem hiding this comment.
Enforce defensive programming by guarding against None values in self.initial_progress fields. If any of the phase fields are None, adding them to step_progress.success_count or step_progress.error_count will raise a TypeError.
def update_workflow_progress(
self, step_name: RagWorkflowStepNames, step_progress: RagStepRunnerProgress
) -> RagProgress:
if step_name not in self._PHASE_FIELDS:
raise ValueError(f"Unhandled enum value: {step_name}")
success_field, error_field = self._PHASE_FIELDS[step_name]
# Step counts are absolute for THIS invocation. Add the baseline once
# to get the overall total. (Old code used max(current, baseline+delta)
# as a defensive monotonicity check, but with absolute counts from the
# step runner that's redundant.)
setattr(
self.current_progress,
success_field,
(getattr(self.initial_progress, success_field) or 0) + step_progress.success_count,
)
setattr(
self.current_progress,
error_field,
(getattr(self.initial_progress, error_field) or 0) + step_progress.error_count,
)
self.current_progress.total_document_completed_count = min(
self.current_progress.total_document_extracted_count,
self.current_progress.total_document_chunked_count,
self.current_progress.total_document_embedded_count,
)
self.current_progress.total_chunk_completed_count = (
self.current_progress.total_chunks_indexed_count
)
# Append new logs to the running list instead of replacing it. The
# previous "replace per yield" behavior dropped all prior entries,
# forcing callers to re-accumulate. Now `current_progress.logs` is
# the authoritative full history.
if step_progress.new_logs:
self.current_progress.logs = (
self.current_progress.logs or []
) + step_progress.new_logs
return self.current_progress| function reflect_jobs_into_rag_store($jobs: JobRecord[]): void { | ||
| // Group RAG jobs by project so each project's state updates atomically. | ||
| const by_project = new Map<string, JobRecord[]>() | ||
| for (const job of $jobs) { | ||
| const tag = get_tag(job) | ||
| if (tag?.kind !== "rag") continue | ||
| if (!job.project_id) continue | ||
| const list = by_project.get(job.project_id) ?? [] | ||
| list.push(job) | ||
| by_project.set(job.project_id, list) | ||
| } | ||
|
|
||
| for (const [project_id, project_jobs] of by_project) { | ||
| ragProgressStore.updateProjectState(project_id, (state) => { | ||
| const progress = { ...state.progress } | ||
| const logs = { ...state.logs } | ||
| const status = { ...state.status } | ||
| const running = { ...state.running_rag_configs } | ||
|
|
||
| for (const job of project_jobs) { | ||
| const tag = get_tag(job) | ||
| if (tag?.kind !== "rag") continue | ||
| const rag_config_id = tag.rag_config_id | ||
|
|
||
| // Live progress / logs come from the worker's per-tick metadata | ||
| // patch. Always overlay when present — the worker's snapshot is | ||
| // strictly fresher than whatever load_all_rag_config_progress put | ||
| // there at page-mount time. | ||
| const rag_progress = ( | ||
| job.metadata as { rag_progress?: RagProgress } | null | ||
| )?.rag_progress | ||
| if (rag_progress) { | ||
| progress[rag_config_id] = rag_progress | ||
| logs[rag_config_id] = rag_progress.logs ?? [] | ||
| } | ||
|
|
||
| if (!is_terminal(job.status)) { | ||
| status[rag_config_id] = "running" | ||
| running[rag_config_id] = true | ||
| } else { | ||
| running[rag_config_id] = false | ||
| if (job.status === "succeeded") { | ||
| const has_errs = (job.progress?.error ?? 0) > 0 | ||
| // Re-derive from the latest progress for the "succeeded but | ||
| // silently incomplete" case (worker reconciles silent skips | ||
| // into progress.error, so has_errs catches those too). | ||
| const p = progress[rag_config_id] | ||
| status[rag_config_id] = has_errs | ||
| ? "completed_with_errors" | ||
| : p | ||
| ? calculateStatus(p) | ||
| : "complete" | ||
| } else { | ||
| // failed / cancelled | ||
| status[rag_config_id] = "completed_with_errors" | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return { | ||
| ...state, | ||
| progress, | ||
| logs, | ||
| status, | ||
| running_rag_configs: running, | ||
| } | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
If there are multiple jobs for the same rag_config_id in $jobs (e.g., a completed historical job and a newly started running job), iterating through them in arbitrary order can cause older jobs to overwrite the state of newer ones. We should filter project_jobs to only process the latest job for each rag_config_id based on created_at.
function reflect_jobs_into_rag_store($jobs: JobRecord[]): void {
// Group RAG jobs by project so each project's state updates atomically.
const by_project = new Map<string, JobRecord[]>()
for (const job of $jobs) {
const tag = get_tag(job)
if (tag?.kind !== "rag") continue
if (!job.project_id) continue
const list = by_project.get(job.project_id) ?? []
list.push(job)
by_project.set(job.project_id, list)
}
for (const [project_id, project_jobs] of by_project) {
ragProgressStore.updateProjectState(project_id, (state) => {
const progress = { ...state.progress }
const logs = { ...state.logs }
const status = { ...state.status }
const running = { ...state.running_rag_configs }
// Only keep the latest job per rag_config_id to prevent older jobs from overwriting newer ones
const latest_jobs = new Map<string, JobRecord>()
for (const job of project_jobs) {
const tag = get_tag(job)
if (!tag) continue
const rag_config_id = tag.rag_config_id
const existing = latest_jobs.get(rag_config_id)
if (!existing || (job.created_at && existing.created_at && new Date(job.created_at) > new Date(existing.created_at))) {
latest_jobs.set(rag_config_id, job)
}
}
for (const job of latest_jobs.values()) {
const tag = get_tag(job)
if (tag?.kind !== "rag") continue
const rag_config_id = tag.rag_config_id
// Live progress / logs come from the worker's per-tick metadata
// patch. Always overlay when present — the worker's snapshot is
// strictly fresher than whatever load_all_rag_config_progress put
// there at page-mount time.
const rag_progress = (
job.metadata as { rag_progress?: RagProgress } | null
)?.rag_progress
if (rag_progress) {
progress[rag_config_id] = rag_progress
logs[rag_config_id] = rag_progress.logs ?? []
}
if (!is_terminal(job.status)) {
status[rag_config_id] = "running"
running[rag_config_id] = true
} else {
running[rag_config_id] = false
if (job.status === "succeeded") {
const has_errs = (job.progress?.error ?? 0) > 0
// Re-derive from the latest progress for the "succeeded but
// silently incomplete" case (worker reconciles silent skips
// into progress.error, so has_errs catches those too).
const p = progress[rag_config_id]
status[rag_config_id] = has_errs
? "completed_with_errors"
: p
? calculateStatus(p)
: "complete"
} else {
// failed / cancelled
status[rag_config_id] = "completed_with_errors"
}
}
}
return {
...state,
progress,
logs,
status,
running_rag_configs: running,
}
})
}
}
📊 Coverage ReportOverall Coverage: 92% Diff: origin/leonard/kil-686-run-eval-via-job...HEAD
Summary
Line-by-lineView line-by-line diff coverageapp/desktop/studio_server/jobs/models.pyLines 207-215 207 snapshot under `metadata.rag_progress` so the frontend dialog can
208 keep showing its four per-phase progress bars without parsing the
209 human-readable `display.secondary` strings.
210 """
! 211 await self._report_metadata_patch(patch)
212
213 async def report_error(self, error_message: str, **extra: Any) -> None:
214 """Append one structured error entry to this run's error log.app/desktop/studio_server/jobs/registry.pyLines 436-444 436
437 async def report_display(update: JobDisplayUpdate) -> None:
438 job = self._jobs.get(job_id)
439 if job is None or job.run_id != run_id:
! 440 return
441 # metadata is free-form; the display sub-object is the only piece
442 # the table renders. Merge in only the fields the worker provided
443 # so a worker that ticks only `secondary` doesn't blank `primary`.
444 display = dict(job.metadata.get("display") or {})Lines 442-450 442 # the table renders. Merge in only the fields the worker provided
443 # so a worker that ticks only `secondary` doesn't blank `primary`.
444 display = dict(job.metadata.get("display") or {})
445 if update.primary is not None:
! 446 display["primary"] = update.primary
447 if update.secondary is not None:
448 display["secondary"] = update.secondary
449 job.metadata = {**job.metadata, "display": display}
450 self._touch(job)Lines 450-460 450 self._touch(job)
451 self._emit(job)
452
453 async def report_metadata_patch(patch: dict[str, Any]) -> None:
! 454 job = self._jobs.get(job_id)
! 455 if job is None or job.run_id != run_id:
! 456 return
457 # Shallow merge — caller provides full sub-object values for the
458 # keys they want updated. `tag` and `display` (which carry the
459 # generic identity / row-rendering contract) are not protected
460 # here, but no worker has a reason to overwrite them; this methodLines 458-468 458 # keys they want updated. `tag` and `display` (which carry the
459 # generic identity / row-rendering contract) are not protected
460 # here, but no worker has a reason to overwrite them; this method
461 # is for adding per-kind structured state alongside.
! 462 job.metadata = {**job.metadata, **patch}
! 463 self._touch(job)
! 464 self._emit(job)
465
466 return JobContext(
467 job_id,
468 run_id,app/desktop/studio_server/jobs/workers/rag.pyLines 80-92 80 """Resolve the on-disk RagConfig + parent project. Raised errors land in
81 the job's `failed` state — the same outcome the user would have gotten
82 from the inline endpoint.
83 """
! 84 project = project_from_id(params.project_id)
! 85 rag_config = RagConfig.from_id_and_parent_path(params.rag_config_id, project.path)
! 86 if rag_config is None:
! 87 raise ValueError(f"RAG config {params.rag_config_id} not found")
! 88 return rag_config, project
89
90
91 class RagJobWorker(JobWorker[RagJobParams, RagJobResult]):
92 """Background worker that runs a RAG ingestion workflow against one config.Lines 110-124 110 result_model = RagJobResult
111 supports_pause = True
112
113 async def compute_state(self, params: RagJobParams) -> JobDerivedState:
! 114 rag_config, project = _load_rag_config(params)
! 115 progress = await compute_current_progress_for_rag_config(project, rag_config)
! 116 total = progress.total_document_count
! 117 success = progress.total_document_completed_count
118 # is_complete only when all docs are processed AND all chunks indexed
119 # (indexing can lag chunking if a prior run died mid-indexing).
! 120 chunks_complete = (
121 progress.total_chunk_count > 0
122 and progress.total_chunks_indexed_count >= progress.total_chunk_count
123 ) or (
124 # No chunks at all (e.g. empty dataset) — chunks aren't the gatingLines 124-132 124 # No chunks at all (e.g. empty dataset) — chunks aren't the gating
125 # signal; document completion is what matters.
126 progress.total_chunk_count == 0
127 )
! 128 return JobDerivedState(
129 total=total,
130 success=success,
131 # error left None: per-phase error counts only exist at runtime —
132 # source-of-truth entities don't persist failures. The registryLines 165-174 165 # log so they show up under "View Errors" in the jobs panel. The
166 # runner now accumulates logs on RagProgress.logs, so we slice
167 # from where we left off.
168 for log in (progress.logs or [])[forwarded_log_count:]:
! 169 if log.level == "error":
! 170 await ctx.report_error(log.message)
171 forwarded_log_count = len(progress.logs or [])
172
173 errors = _aggregate_errors(progress)
174 await ctx.report_progress(Lines 186-194 186 if latest is None:
187 # RagWorkflowRunner always yields at least the initial_progress, so
188 # this shouldn't be reachable — but if it ever is, return zeros
189 # rather than letting the result_model validation fail.
! 190 return RagJobResult(
191 documents_total=0,
192 documents_completed=0,
193 chunks_indexed=0,
194 errors=0,app/desktop/studio_server/rag_jobs_api.pyLines 55-71 55 the jobs SSE bus (`GET /api/jobs/events`). Idempotent: re-running the
56 same RAG config supersedes any in-flight predecessor (cancel + remove)
57 instead of stacking a duplicate row in the jobs panel.
58 """
! 59 project = project_from_id(project_id)
! 60 rag_config = get_rag_config_from_id(project, rag_config_id)
! 61 if rag_config.is_archived:
! 62 raise HTTPException(
63 status_code=422,
64 detail="This RAG configuration is archived. You must unarchive it to use it.",
65 )
66
! 67 job = await job_registry.create(
68 "rag",
69 {
70 "project_id": project_id,
71 "rag_config_id": rag_config_id,Lines 90-95 90 # cancel+restart semantic visible in the panel.
91 idempotency_key=f"rag:{rag_config_id}",
92 )
93
! 94 return RunRagConfigResponse(kiln_job_tracking_id=job.id)libs/core/kiln_ai/adapters/rag/rag_runners.pyLines 335-345 335 # into the same yield as the counts — one tick per AsyncJobRunner
336 # progress update, instead of interleaving log-only yields.
337 new_logs: list[LogMessage] = []
338 if observer.get_error_count() > error_idx:
! 339 errors, error_idx = observer.get_errors(error_idx)
! 340 for job, error in errors:
! 341 new_logs.append(
342 LogMessage(
343 level="error",
344 message=f"Error extracting document: {job.doc.path}: {error}",
345 )Lines 432-444 432 )
433
434 error_idx = 0
435 async for progress in runner.run():
! 436 new_logs: list[LogMessage] = []
! 437 if observer.get_error_count() > error_idx:
! 438 errors, error_idx = observer.get_errors(error_idx)
! 439 for job, error in errors:
! 440 new_logs.append(
441 LogMessage(
442 level="error",
443 message=f"Error chunking document: {job.extraction.path}: {error}",
444 )Lines 543-555 543 )
544
545 error_idx = 0
546 async for progress in runner.run():
! 547 new_logs: list[LogMessage] = []
! 548 if observer.get_error_count() > error_idx:
! 549 errors, error_idx = observer.get_errors(error_idx)
! 550 for job, error in errors:
! 551 new_logs.append(
552 LogMessage(
553 level="error",
554 message=f"Error embedding document: {job.chunked_document.path}: {error}",
555 )Lines 854-862 854 # previous "replace per yield" behavior dropped all prior entries,
855 # forcing callers to re-accumulate. Now `current_progress.logs` is
856 # the authoritative full history.
857 if step_progress.new_logs:
! 858 self.current_progress.logs = (
859 self.current_progress.logs or []
860 ) + step_progress.new_logs
861 return self.current_progress
|
…/Kiln into leonard/kil-687-rag-via-job
- RagJobWorker: thread save_context_for_project into build_rag_workflow_runner so background RAG ingestion writes (Extraction/ChunkedDocument/ChunkEmbeddings) are git-sync committed for auto-sync projects instead of left dirty (mirrors eval worker). Adds a regression test locking in the save_context wiring. - rag_jobs_api: add missing summary="Run RAG Config" and document why the run endpoint stays GET (idempotent supersede, writes no project files inline). Regenerate api_schema.d.ts for the summary. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
RagJobWorkerruns the existingRagWorkflowRunnerunder the job system, with idempotency keyed onrag_config_id(re-launches supersede in-flight predecessors).GET /api/projects/{project_id}/rag_configs/{rag_config_id}/runmoved out of the sharedkiln_server.document_apiinto a new desktop-onlyrag_jobs_api. Returns{kiln_job_tracking_id}immediately instead of streaming SSE;build_rag_workflow_runnerstays in the shared lib as a pure helper.rag_progress_storeno longer opens per-configEventSources — it derives live progress, logs, and status from the project-scoped\$jobsstore via a module-level reflector. Refreshing the configs list page now picks up runs spawned from any session.JobContextgrowsreport_display(primary?, secondary?)andreport_metadata_patch(patch)for workers that need to rewrite the row's Details lines or attach structured per-kind state (RAG stamps the fullRagProgresssnapshot undermetadata.rag_progressso the existing four-bar dialog keeps working).rag_runners.pysimplified: step runners yield absolute cumulative counts (was cumulative for the three doc-phases but per-batch deltas for indexing); workflowupdate_workflow_progresscollapses from a four-arm match to a dict-mapped uniform assignment; logs accumulate onRagProgress.logsinstead of being replaced per yield.collect_jobs, so B's per-phasestep.success_countstays 0). The workflow now recomputesinitial_progressafter acquiring the workflow lock; the worker also re-snapshots once more at end of run for the final reported numbers, carrying runtime error counts and accumulated logs from the workflow's tracked state.Test plan
GET .../run— receives{kiln_job_tracking_id}JSON.🤖 Generated with Claude Code